gogs пре 1 месец
родитељ
комит
140896e386
4 измењених фајлова са 169 додато и 31 уклоњено
  1. 86 0
      app/Console/Commands/MqttPusher.php
  2. 24 31
      app/Console/Commands/MqttSubscriber.php
  3. 58 0
      app/Jobs/PushDeviceJob.php
  4. 1 0
      composer.json

+ 86 - 0
app/Console/Commands/MqttPusher.php

@@ -0,0 +1,86 @@
+<?php
+
+namespace App\Console\Commands;
+
+use App\Service\Device\DeviceCommonService;
+use Illuminate\Console\Command;
+use PhpMqtt\Client\MqttClient;
+use PhpMqtt\Client\ConnectionSettings;
+class MqttPusher extends Command
+{
+    /**
+     * The name and signature of the console command.
+     *
+     * @var string
+     */
+    protected $signature = 'mqtt:pusher';
+
+    /**
+     * The console command description.
+     *
+     * @var string
+     */
+    protected $description = 'Command pusher';
+
+    /**
+     * Create a new command instance.
+     *
+     * @return void
+     */
+    public function __construct()
+    {
+        parent::__construct();
+    }
+
+    /**
+     * Execute the console command.
+     *
+     * @return mixed
+     */
+    // App/Console/Commands/MqttSubscriber.php
+    // App/Console/Commands/MqttPublisher.php
+    public function handle()
+    {
+        $service = new DeviceCommonService();
+        $deviceConfigs = $service->deviceList;
+
+        $this->info("发送进程启动(短连接模式)...");
+
+        $m = 3;
+        while (true) {
+            foreach ($deviceConfigs as $dev) {
+                try {
+                    // 1. 每次发送前创建新实例
+                    $mqtt = new MqttClient('47.111.77.194', 1883, 'PHP_PUB');
+                    $settings = (new ConnectionSettings)
+                        ->setUsername('HCWDL2')
+                        ->setPassword('HCWDL2')
+                        ->setConnectTimeout(5);
+
+                    // 2. 连接 -> 发送 -> 断开
+                    $mqtt->connect($settings, true);
+
+                    $pubTopic = "/HC/{$dev['type']}/WriteRealtimeData/{$dev['id']}";
+                    $hex = str_replace(' ', '', $dev['cmd']);
+
+                    $mqtt->publish($pubTopic, hex2bin($hex), 0);
+                    $this->info("【发送成功】 " . date('H:i:s') . " -> {$pubTopic}|{$hex}");
+
+                    $mqtt->disconnect(); // 主动断开,不占用服务端资源
+
+                } catch (\Exception $e) {
+                    $this->error("【发送异常】 " . date('H:i:s') . " | " . $e->getMessage());
+                }
+
+                // 3. 严格等待 30 秒
+                $this->info("等待 {$m} 秒中...");
+                sleep($m);
+            }
+
+            // 防止数组为空导致死循环空转
+            if (empty($deviceConfigs)) {
+                sleep(10);
+            }
+        }
+    }
+}

+ 24 - 31
app/Console/Commands/MqttSubscriber.php

@@ -2,6 +2,7 @@
 
 namespace App\Console\Commands;
 
+use App\Service\Device\DeviceCommonService;
 use Illuminate\Console\Command;
 use PhpMqtt\Client\MqttClient;
 use PhpMqtt\Client\ConnectionSettings;
@@ -36,44 +37,36 @@ class MqttSubscriber extends Command
      *
      * @return mixed
      */
+    // App/Console/Commands/MqttSubscriber.php
     public function handle()
     {
-        $server   = '47.111.77.194';
-        $port     = 1883;
-        $clientId = 'laravel_backend_worker';
+        $mqtt = new MqttClient('47.111.77.194', 1883, 'PHP_SUB_WORKER');
+        $mqtt->connect((new ConnectionSettings)->setUsername('yonglidev1')->setPassword('tZjUw0kQ'), true);
 
-        $mqtt = new MqttClient($server, $port, $clientId);
+        $this->info("接收进程已启动,监听中...");
+        $service = new DeviceCommonService();
+        $time = time();
+        // 使用通配符监听所有 Read 主题
+        $mqtt->subscribe('/HC/+/ReadRealtimeData/+', function ($topic, $message) use ($service,$time) {
+            if(time()- $time > 2){
+                $message =  bin2hex($message);
+                $this->info("【收到】 " . date('H:i:s') . " | Topic: {$topic} | 内容: " .$message);
+                // 1. 将字符串拆分为数组
+                $parts = explode('/', $topic);
 
-        $settings = (new ConnectionSettings)
-            ->setUsername('php_server_user')
-            ->setPassword('sEcrEt_pAss_123');
-
-        $this->info("正在连接到 MQTT Broker...");
-
-        $mqtt->connect($settings, true);
-
-        // 订阅主题
-        $mqtt->subscribe('/wy/119/RealtimeData/DT5/yonglidev1', function ($topic, $message) {
-            $this->info("收到消息: $message");
-
-            $data = json_decode($message, true);
-
-            if (isset($data['total_work'])) {
-                // 利用 Laravel 的模型直接入库
-                $hours = round($data['total_work'] / 60, 2);
-
-//                WorkLog::create([
-//                    'device_id' => $data['device_id'] ?? 'unknown',
-//                    'minutes'   => $data['total_work'],
-//                    'hours'     => $hours,
-//                    'raw_data'  => $message,
-//                ]);
-
-                $this->info("数据已入库: {$hours} 小时");
+                // 2. 获取第 4 位内容(注意数组下标从 0 开始)
+                // 路径:/ (0) HC (1) + (2) ReadRealtimeData (3) + (4)
+                $type = $parts[4] ?? '';
+                $service->dealDevice($type,$message, function($msg) {
+                    $this->info($msg); // 这里的 $this 指向 Command
+                });
             }
+
+            // 异步交给 Service 处理入库逻辑
+            // app(PlcDataService::class)->handle($topic, $message);
         }, 0);
 
-        // 开始死循环监听
+        // 接收进程只需要死循环 loop 即可
         $mqtt->loop(true);
     }
 }

+ 58 - 0
app/Jobs/PushDeviceJob.php

@@ -0,0 +1,58 @@
+<?php
+
+
+namespace App\Jobs;
+use Illuminate\Bus\Queueable;
+use Illuminate\Contracts\Queue\ShouldQueue;
+use Illuminate\Foundation\Bus\Dispatchable;
+use Illuminate\Queue\InteractsWithQueue;
+use Illuminate\Queue\SerializesModels;
+use Illuminate\Support\Facades\Log;
+
+class PushDeviceJob implements ShouldQueue
+{
+    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
+
+    protected $data;
+    protected $user;
+    protected $type;
+
+    public $timeout = 300;
+
+    public function __construct($data)
+    {
+        $this->data = $data;
+    }
+
+    public function handle()
+    {   $data = $this->data;
+        $this->sendToDeviceBatch($data);
+    }
+
+    public function sendToDeviceBatch($data){
+
+        $curl = curl_init();
+        $url = 'https://workapi.qingyaokeji.com/api/man_device_hc';
+        curl_setopt_array($curl, array(
+            CURLOPT_URL => $url,
+            CURLOPT_RETURNTRANSFER => true,
+            CURLOPT_ENCODING => '',
+            CURLOPT_MAXREDIRS => 10,
+            CURLOPT_TIMEOUT => 0,
+            CURLOPT_FOLLOWLOCATION => true,
+            CURLOPT_HTTP_VERSION => CURL_HTTP_VERSION_1_1,
+            CURLOPT_CUSTOMREQUEST => 'POST',
+            CURLOPT_POSTFIELDS => json_encode($data),
+            CURLOPT_HTTPHEADER => array(
+                'Content-Type: application/json'
+            ),
+        ));
+
+        $response = curl_exec($curl);
+        Log::channel('apiLog')->info('request-JH', ["param" =>$data,"url"=>$url,"response"=>$response]);
+        curl_close($curl);
+        echo $response;
+
+    }
+
+}

+ 1 - 0
composer.json

@@ -16,6 +16,7 @@
         "laravel/framework": "^6.20.26",
         "laravel/tinker": "^2.5",
         "maatwebsite/excel": "^3.1",
+        "php-mqtt/client": "^1.8",
         "swooletw/laravel-swoole": "^2.13"
     },
     "require-dev": {