MqttSubscriber.php 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172
  1. <?php
  2. namespace App\Console\Commands;
  3. use App\Service\Device\DeviceCommonService;
  4. use Illuminate\Console\Command;
  5. use PhpMqtt\Client\MqttClient;
  6. use PhpMqtt\Client\ConnectionSettings;
  7. class MqttSubscriber extends Command
  8. {
  9. /**
  10. * The name and signature of the console command.
  11. *
  12. * @var string
  13. */
  14. protected $signature = 'mqtt:subscribe';
  15. /**
  16. * The console command description.
  17. *
  18. * @var string
  19. */
  20. protected $description = 'Command description';
  21. /**
  22. * Create a new command instance.
  23. *
  24. * @return void
  25. */
  26. public function __construct()
  27. {
  28. parent::__construct();
  29. }
  30. /**
  31. * Execute the console command.
  32. *
  33. * @return mixed
  34. */
  35. // App/Console/Commands/MqttSubscriber.php
  36. public function handle()
  37. {
  38. $mqtt = new MqttClient('47.111.77.194', 1883, 'PHP_SUB_WORKER');
  39. $mqtt->connect((new ConnectionSettings)->setUsername('yonglidev1')->setPassword('tZjUw0kQ'), true);
  40. $this->info("接收进程已启动,监听中...");
  41. $service = new DeviceCommonService();
  42. $time = time();
  43. // 使用通配符监听所有 Read 主题
  44. $mqtt->subscribe('/HC/+/ReadRealtimeData/+', function ($topic, $message) use ($service,$time) {
  45. if(time()- $time > 2){
  46. $message = bin2hex($message);
  47. $this->info("【收到】 " . date('H:i:s') . " | Topic: {$topic} | 内容: " .$message);
  48. // 1. 将字符串拆分为数组
  49. $parts = explode('/', $topic);
  50. // 2. 获取第 4 位内容(注意数组下标从 0 开始)
  51. // 路径:/ (0) HC (1) + (2) ReadRealtimeData (3) + (4)
  52. $type = $parts[4] ?? '';
  53. $service->dealDevice($type,$message, function($msg) {
  54. $this->info($msg); // 这里的 $this 指向 Command
  55. });
  56. }
  57. // 异步交给 Service 处理入库逻辑
  58. // app(PlcDataService::class)->handle($topic, $message);
  59. }, 0);
  60. // 接收进程只需要死循环 loop 即可
  61. $mqtt->loop(true);
  62. }
  63. }