| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172 |
- <?php
- namespace App\Console\Commands;
- use App\Service\Device\DeviceCommonService;
- use Illuminate\Console\Command;
- use PhpMqtt\Client\MqttClient;
- use PhpMqtt\Client\ConnectionSettings;
- class MqttSubscriber extends Command
- {
- /**
- * The name and signature of the console command.
- *
- * @var string
- */
- protected $signature = 'mqtt:subscribe';
- /**
- * The console command description.
- *
- * @var string
- */
- protected $description = 'Command description';
- /**
- * Create a new command instance.
- *
- * @return void
- */
- public function __construct()
- {
- parent::__construct();
- }
- /**
- * Execute the console command.
- *
- * @return mixed
- */
- // App/Console/Commands/MqttSubscriber.php
- public function handle()
- {
- $mqtt = new MqttClient('47.111.77.194', 1883, 'PHP_SUB_WORKER');
- $mqtt->connect((new ConnectionSettings)->setUsername('yonglidev1')->setPassword('tZjUw0kQ'), true);
- $this->info("接收进程已启动,监听中...");
- $service = new DeviceCommonService();
- $time = time();
- // 使用通配符监听所有 Read 主题
- $mqtt->subscribe('/HC/+/ReadRealtimeData/+', function ($topic, $message) use ($service,$time) {
- if(time()- $time > 2){
- $message = bin2hex($message);
- $this->info("【收到】 " . date('H:i:s') . " | Topic: {$topic} | 内容: " .$message);
- // 1. 将字符串拆分为数组
- $parts = explode('/', $topic);
- // 2. 获取第 4 位内容(注意数组下标从 0 开始)
- // 路径:/ (0) HC (1) + (2) ReadRealtimeData (3) + (4)
- $type = $parts[4] ?? '';
- $service->dealDevice($type,$message, function($msg) {
- $this->info($msg); // 这里的 $this 指向 Command
- });
- }
- // 异步交给 Service 处理入库逻辑
- // app(PlcDataService::class)->handle($topic, $message);
- }, 0);
- // 接收进程只需要死循环 loop 即可
- $mqtt->loop(true);
- }
- }
|