cqp 3 недель назад
Родитель
Сommit
9754a557be
2 измененных файлов с 143 добавлено и 68 удалено
  1. 141 66
      app/Jobs/ManDeviceJobLf.php
  2. 2 2
      config/u.php

+ 141 - 66
app/Jobs/ManDeviceJobLf.php

@@ -2,14 +2,13 @@
 
 namespace App\Jobs;
 
-use App\Model\DeviceDataLf;
-use App\Model\LfDevice;
 use App\Service\ClearDataService;
 use Illuminate\Bus\Queueable;
 use Illuminate\Contracts\Queue\ShouldQueue;
 use Illuminate\Foundation\Bus\Dispatchable;
 use Illuminate\Queue\InteractsWithQueue;
 use Illuminate\Queue\SerializesModels;
+use Illuminate\Support\Facades\Log;
 use Symfony\Component\Console\Output\OutputInterface;
 
 class ManDeviceJobLf implements ShouldQueue
@@ -30,7 +29,78 @@ class ManDeviceJobLf implements ShouldQueue
         $this->url = config('ip.zslfip');
     }
 
-    public $device = "0140142407030000927124847284,0140142407030000927124847285,0140142407030000927124847286,0140142407030000927124847287,0140142407030000927124848151,0140142407030000916024847613,0140142407030000916024847614,0140142407030000916024847615,0140142407030000916024847616,0140142407030000916024847617,0140142407030000937124849723,0140142407030000937124849724,0140142407030000937124849725,0140142407030000937124849726,0140142407030000937124849727,0140142407030000937124849728,0140142407030000937124849729,0140142407030000937124849730,0140142407030000894824848319,0140142407030000894824848320,0140142407030000894824848321,0140142407030000894824848322,0140142407030000894824848323,0140142407030000894824848324,0140142407030000894824848325,0140142407030000894824848326";
+//    public function handle()
+//    {
+//        try{
+//            if(empty($this->data['data'])) return;
+//
+//            $deviceId = $this->data['data']['deviceId'];
+//            if($this->data['type'] == "dataPoint"){
+//                foreach ($this->data['data']['dataPoints'] as $value){
+//                    $dev_eui = $deviceId . $value['dataPointId'];
+//
+//                    if(empty($value['value'])) continue;
+//
+//                    //发送给朗峰
+//                    $this->sendToDevice(['dev_eui' => $dev_eui, 'value' => $value['value']]);
+//                }
+//            }
+//        }catch (\Exception $exception){
+//            file_put_contents('send_man_error_lf.txt',date("Y-m-d H:i:s").json_encode($this->data).PHP_EOL.$exception->getMessage().$exception->getLine().$exception->getFile().PHP_EOL,8);
+//            $this->delete();
+//        }
+//    }
+
+//    public function sendToDevice($data){
+//        list($status,$token) = ClearDataService::getTokenLf();
+//        if(! $status) return;
+//
+//        $url = $this->url . "api/module-data/device_machine_record/device_machine_record/diy/create_single_data";
+//        $post = [
+//            'bizId' => -1,
+//            'bizTypeEk' => 'LOWCODE',
+//            'data' => [
+//                'device_machine_record' => [
+//                    'machine_code' => $data['dev_eui'],
+//                    'param_value' => floatval($data['value']),
+//                    'record_time' => $this->getNowDay()
+//                ]
+//            ],
+//            'dynamicFormId' => '477743923368955904',
+//            'showModelId' => '477745421456904192'
+//        ];
+//        $header = ["Authorization: Bearer {$token}","Content-Type:application/json","Site:LFMY"];
+//
+//        $curl = curl_init();
+//        curl_setopt_array($curl, array(
+//            CURLOPT_URL => $url,
+//            CURLOPT_RETURNTRANSFER => true,
+//            CURLOPT_ENCODING => '',
+//            CURLOPT_MAXREDIRS => 10,
+//            CURLOPT_TIMEOUT => 15,
+//            CURLOPT_FOLLOWLOCATION => true,
+//            CURLOPT_HTTP_VERSION => CURL_HTTP_VERSION_1_1,
+//            CURLOPT_CUSTOMREQUEST => 'POST',
+//            CURLOPT_POSTFIELDS => json_encode($post),
+//            CURLOPT_SSL_VERIFYPEER => false,
+//            CURLOPT_HTTPHEADER => $header,
+//        ));
+//        $response = curl_exec($curl);
+//        if ($response === false) {
+//            // 获取错误号
+//            $errorNumber = curl_errno($curl);
+//            // 获取错误信息
+//            $errorMessage = curl_error($curl);
+//            $message = "cURL Error #{$errorNumber}: {$errorMessage}";
+//            file_put_contents('record_man_lf_error.txt',date('Y-m-d H:i:s'). PHP_EOL . $message .PHP_EOL,8);
+//        }
+//        curl_close($curl);
+//
+//        $res = json_decode($response,true);
+//        if(! isset($res['status'])){//报错了
+//            file_put_contents('record_man_lf_error.txt',date('Y-m-d H:i:s'). PHP_EOL . $response .PHP_EOL.json_encode($post),8);
+//        }
+//    }
 
     /**
      * Execute the job.
@@ -39,61 +109,82 @@ class ManDeviceJobLf implements ShouldQueue
      */
     public function handle()
     {
-        try{
-            if(empty($this->data['data'])) return;
-
-            $device = explode(',',$this->device);
+        try {
+            if (empty($this->data['data'])) return;
 
             $deviceId = $this->data['data']['deviceId'];
-            $deviceName = $this->data['data']['deviceName'];
-            $ip = self::getIP();
-            $data_type = 3;
-            if($this->data['type'] == "dataPoint"){
-                $insert = [];
-                foreach ($this->data['data']['dataPoints'] as $value){
-                    $dev_eui = $deviceId . $value['dataPointId'];
-
-//                    if(! in_array($dev_eui,$device)) continue;
-
-                    if(empty($value['value'])) continue;
-
-                    //发送给朗峰
-                    $this->sendToDevice(['dev_eui' => $dev_eui, 'value' => $value['value']]);
-
-//                    $insert[] = [
-//                        'happening_data' => $value['value'],
-//                        'dev_eui' => $dev_eui,
-//                        'device_name' => $deviceName,
-//                        'source_ip' => $ip,
-//                        'data_type' => $data_type,
-//                        'crt_time' => time()
-//                    ];
+            $dataPoints = $this->data['data']['dataPoints'] ?? [];
+
+            if ($this->data['type'] == "dataPoint") {
+                $redisKey = 'buffer:langfeng_data';
+                $batchSize = 500;
+
+                // 1. 整理数据并推入 Redis 列表
+                $records = [];
+                foreach ($dataPoints as $value) {
+                    if (empty($value['value'])) continue;
+
+                    $records[] = json_encode([
+                        'machine_code' => $deviceId . $value['dataPointId'],
+                        'param_value'   => floatval($value['value']),
+                        'time'    => $this->getNowDay()
+                    ]);
+                }
+
+                if (!empty($records)) {
+                    // 批量推入 Redis
+                    \Illuminate\Support\Facades\Redis::rpush($redisKey, ...$records);
+
+                    // 【核心修改点】:每次写入数据都重置 Key 的过期时间为 30 分钟 (1800秒)
+                    // 只要持续有数据进来,这个 Key 就一直有效
+                    // 如果超过 30 分钟没新数据进来(设备关机/下班),Redis 自动删除该 Key
+                    \Illuminate\Support\Facades\Redis::expire($redisKey, 1800);
+                }
+
+                // 2. 检查 Redis 里的总条数
+                $currentCount = \Illuminate\Support\Facades\Redis::llen($redisKey);
+
+                // 3. 达到 500 条,执行批量发送
+
+                if ($currentCount >= $batchSize) {
+                    $this->processBatchSend($redisKey, $batchSize);
                 }
-//                DeviceDataLf::insert($insert);
             }
-        }catch (\Exception $exception){
-            file_put_contents('send_man_error_lf.txt',date("Y-m-d H:i:s").json_encode($this->data).PHP_EOL.$exception->getMessage().$exception->getLine().$exception->getFile().PHP_EOL,8);
+        } catch (\Exception $exception) {
+            Log::channel('apiLog')->info('lf开始位置', ["param" => $exception->getMessage()]);
             $this->delete();
         }
     }
 
-    public function sendToDevice($data){
+    /**
+     * 批量处理发送逻辑
+     */
+    protected function processBatchSend($redisKey, $batchSize)
+    {
+        // 原子化弹出 500 条数据
+        $rawRecords = \Illuminate\Support\Facades\Redis::pipeline(function ($pipe) use ($redisKey, $batchSize) {
+            for ($i = 0; $i < $batchSize; $i++) {
+                $pipe->lpop($redisKey);
+            }
+        });
+
+        $batchData = array_map(fn($item) => json_decode($item, true), array_filter($rawRecords));
+
+        if (empty($batchData)) return;
+
+        list($status,$msg) = $this->sendToDeviceBatch($batchData);
+        if(! $status) echo $msg;
+    }
+
+    public function sendToDeviceBatch($data){
         list($status,$token) = ClearDataService::getTokenLf();
-        if(! $status) return;
+        if(! $status) return [false, $token];
 
-        $url = $this->url . "api/module-data/device_machine_record/device_machine_record/diy/create_single_data";
+        $url = $this->url . "api/module-data/hc_device_machine_record/hc_device_machine_record/diy/create_data";
         $post = [
-            'bizId' => -1,
-            'bizTypeEk' => 'LOWCODE',
             'data' => [
-                'device_machine_record' => [
-                    'machine_code' => $data['dev_eui'],
-                    'param_value' => floatval($data['value']),
-                    'record_time' => $this->getNowDay()
-                ]
-            ],
-            'dynamicFormId' => '477743923368955904',
-            'showModelId' => '477745421456904192'
+                'device_machine_record' => $data
+            ]
         ];
         $header = ["Authorization: Bearer {$token}","Content-Type:application/json","Site:LFMY"];
 
@@ -103,7 +194,7 @@ class ManDeviceJobLf implements ShouldQueue
             CURLOPT_RETURNTRANSFER => true,
             CURLOPT_ENCODING => '',
             CURLOPT_MAXREDIRS => 10,
-            CURLOPT_TIMEOUT => 0,
+            CURLOPT_TIMEOUT => 15,
             CURLOPT_FOLLOWLOCATION => true,
             CURLOPT_HTTP_VERSION => CURL_HTTP_VERSION_1_1,
             CURLOPT_CUSTOMREQUEST => 'POST',
@@ -118,32 +209,16 @@ class ManDeviceJobLf implements ShouldQueue
             // 获取错误信息
             $errorMessage = curl_error($curl);
             $message = "cURL Error #{$errorNumber}: {$errorMessage}";
-            file_put_contents('record_man_lf_error.txt',date('Y-m-d H:i:s'). PHP_EOL . $message .PHP_EOL,8);
+            Log::channel('apiLog')->info('lf写入数据', ["param" => $message]);
         }
         curl_close($curl);
 
         $res = json_decode($response,true);
         if(! isset($res['status'])){//报错了
-            file_put_contents('record_man_lf_error.txt',date('Y-m-d H:i:s'). PHP_EOL . $response .PHP_EOL.json_encode($post),8);
+            Log::channel('apiLog')->info('lf写入数据返回错误', ["param" => $response]);
         }
-    }
 
-    public static function getIP(){
-        if (getenv('HTTP_CLIENT_IP')) {
-            $ip = getenv('HTTP_CLIENT_IP');
-        }
-        elseif (getenv('HTTP_X_REAL_IP')) {
-            $ip = getenv('HTTP_X_REAL_IP');
-        } elseif (getenv('HTTP_X_FORWARDED_FOR')) {
-            $ip = getenv('HTTP_X_FORWARDED_FOR');
-            $ips = explode(',', $ip);
-            $ip = $ips[0];
-        } elseif (getenv('REMOTE_ADDR')) {
-            $ip = getenv('REMOTE_ADDR');
-        } else {
-            $ip = '0.0.0.0';
-        }
-        return $ip;
+        return [true, ''];
     }
 
     protected function echoMessage(OutputInterface $output)

+ 2 - 2
config/u.php

@@ -6,8 +6,8 @@ return [
         'api_port' => '47438',
     ],
     "HCLT" => [
-        'api_host' => '',
-        'api_port' => '',
+        'api_host' => '1ag17727201hq.vicp.fun',
+        'api_port' => '46227',
     ],
     "JLWM" => [
         'api_host' => '',