clients = new \SplObjectStorage; } public function handle() { $this->info('WebSocket server starting on ws://118.31.172.116:6002'); // 创建WebSocket服务器实例 $this->serverInstance = IoServer::factory( new HttpServer( new WsServer( new class($this) implements MessageComponentInterface { private $server; public function __construct($server) { $this->server = $server; } public function onOpen(ConnectionInterface $conn) { $this->server->clients->attach($conn); echo "New connection! ({$conn->resourceId})\n"; // foreach ($this->server->clients as $client) { //// $client->send('1'); // } } public function onMessage(ConnectionInterface $from, $msg) { echo "Message from {$from->resourceId}: $msg\n"; $queueName = 'log_queue'; // 队列名称 $logData = Redis::lpop($queueName); foreach ($this->server->clients as $client) { $client->send($logData); } } public function onClose(ConnectionInterface $conn) { $this->server->clients->detach($conn); echo "Connection {$conn->resourceId} has disconnected\n"; } public function onError(ConnectionInterface $conn, \Exception $e) { echo "Error: {$e->getMessage()}\n"; $conn->close(); } } ) ), 6002 ); // 启动WebSocket服务器 $this->serverInstance->run(); } // 将WebSocket服务器的实例暴露给外部 public function pushMessageToClients($message) { if ($this->serverInstance) { // 向所有连接的客户端发送消息 foreach ($this->clients as $client) { $client->send($message); } } } }