MqttSubscriber.php 1.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879
  1. <?php
  2. namespace App\Console\Commands;
  3. use Illuminate\Console\Command;
  4. use PhpMqtt\Client\MqttClient;
  5. use PhpMqtt\Client\ConnectionSettings;
  6. class MqttSubscriber extends Command
  7. {
  8. /**
  9. * The name and signature of the console command.
  10. *
  11. * @var string
  12. */
  13. protected $signature = 'mqtt:subscribe';
  14. /**
  15. * The console command description.
  16. *
  17. * @var string
  18. */
  19. protected $description = 'Command description';
  20. /**
  21. * Create a new command instance.
  22. *
  23. * @return void
  24. */
  25. public function __construct()
  26. {
  27. parent::__construct();
  28. }
  29. /**
  30. * Execute the console command.
  31. *
  32. * @return mixed
  33. */
  34. public function handle()
  35. {
  36. $server = '47.111.77.194';
  37. $port = 1883;
  38. $clientId = 'laravel_backend_worker';
  39. $mqtt = new MqttClient($server, $port, $clientId);
  40. $settings = (new ConnectionSettings)
  41. ->setUsername('php_server_user')
  42. ->setPassword('sEcrEt_pAss_123');
  43. $this->info("正在连接到 MQTT Broker...");
  44. $mqtt->connect($settings, true);
  45. // 订阅主题
  46. $mqtt->subscribe('/wy/119/RealtimeData/DT5/yonglidev1', function ($topic, $message) {
  47. $this->info("收到消息: $message");
  48. $data = json_decode($message, true);
  49. if (isset($data['total_work'])) {
  50. // 利用 Laravel 的模型直接入库
  51. $hours = round($data['total_work'] / 60, 2);
  52. // WorkLog::create([
  53. // 'device_id' => $data['device_id'] ?? 'unknown',
  54. // 'minutes' => $data['total_work'],
  55. // 'hours' => $hours,
  56. // 'raw_data' => $message,
  57. // ]);
  58. $this->info("数据已入库: {$hours} 小时");
  59. }
  60. }, 0);
  61. // 开始死循环监听
  62. $mqtt->loop(true);
  63. }
  64. }