ManDeviceJobLf.php 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241
  1. <?php
  2. namespace App\Jobs;
  3. use App\Service\ClearDataService;
  4. use Illuminate\Bus\Queueable;
  5. use Illuminate\Contracts\Queue\ShouldQueue;
  6. use Illuminate\Foundation\Bus\Dispatchable;
  7. use Illuminate\Queue\InteractsWithQueue;
  8. use Illuminate\Queue\SerializesModels;
  9. use Illuminate\Support\Facades\Log;
  10. use Symfony\Component\Console\Output\OutputInterface;
  11. class ManDeviceJobLf implements ShouldQueue
  12. {
  13. use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
  14. protected $data;
  15. protected $url;
  16. /**
  17. * Create a new job instance.
  18. *
  19. * @return void
  20. */
  21. public function __construct($data)
  22. {
  23. $this->data = $data;
  24. $this->url = config('ip.zslfip');
  25. }
  26. // public function handle()
  27. // {
  28. // try{
  29. // if(empty($this->data['data'])) return;
  30. //
  31. // $deviceId = $this->data['data']['deviceId'];
  32. // if($this->data['type'] == "dataPoint"){
  33. // foreach ($this->data['data']['dataPoints'] as $value){
  34. // $dev_eui = $deviceId . $value['dataPointId'];
  35. //
  36. // if(empty($value['value'])) continue;
  37. //
  38. // //发送给朗峰
  39. // $this->sendToDevice(['dev_eui' => $dev_eui, 'value' => $value['value']]);
  40. // }
  41. // }
  42. // }catch (\Exception $exception){
  43. // file_put_contents('send_man_error_lf.txt',date("Y-m-d H:i:s").json_encode($this->data).PHP_EOL.$exception->getMessage().$exception->getLine().$exception->getFile().PHP_EOL,8);
  44. // $this->delete();
  45. // }
  46. // }
  47. // public function sendToDevice($data){
  48. // list($status,$token) = ClearDataService::getTokenLf();
  49. // if(! $status) return;
  50. //
  51. // $url = $this->url . "api/module-data/device_machine_record/device_machine_record/diy/create_single_data";
  52. // $post = [
  53. // 'bizId' => -1,
  54. // 'bizTypeEk' => 'LOWCODE',
  55. // 'data' => [
  56. // 'device_machine_record' => [
  57. // 'machine_code' => $data['dev_eui'],
  58. // 'param_value' => floatval($data['value']),
  59. // 'record_time' => $this->getNowDay()
  60. // ]
  61. // ],
  62. // 'dynamicFormId' => '477743923368955904',
  63. // 'showModelId' => '477745421456904192'
  64. // ];
  65. // $header = ["Authorization: Bearer {$token}","Content-Type:application/json","Site:LFMY"];
  66. //
  67. // $curl = curl_init();
  68. // curl_setopt_array($curl, array(
  69. // CURLOPT_URL => $url,
  70. // CURLOPT_RETURNTRANSFER => true,
  71. // CURLOPT_ENCODING => '',
  72. // CURLOPT_MAXREDIRS => 10,
  73. // CURLOPT_TIMEOUT => 15,
  74. // CURLOPT_FOLLOWLOCATION => true,
  75. // CURLOPT_HTTP_VERSION => CURL_HTTP_VERSION_1_1,
  76. // CURLOPT_CUSTOMREQUEST => 'POST',
  77. // CURLOPT_POSTFIELDS => json_encode($post),
  78. // CURLOPT_SSL_VERIFYPEER => false,
  79. // CURLOPT_HTTPHEADER => $header,
  80. // ));
  81. // $response = curl_exec($curl);
  82. // if ($response === false) {
  83. // // 获取错误号
  84. // $errorNumber = curl_errno($curl);
  85. // // 获取错误信息
  86. // $errorMessage = curl_error($curl);
  87. // $message = "cURL Error #{$errorNumber}: {$errorMessage}";
  88. // file_put_contents('record_man_lf_error.txt',date('Y-m-d H:i:s'). PHP_EOL . $message .PHP_EOL,8);
  89. // }
  90. // curl_close($curl);
  91. //
  92. // $res = json_decode($response,true);
  93. // if(! isset($res['status'])){//报错了
  94. // file_put_contents('record_man_lf_error.txt',date('Y-m-d H:i:s'). PHP_EOL . $response .PHP_EOL.json_encode($post),8);
  95. // }
  96. // }
  97. /**
  98. * Execute the job.
  99. *
  100. * @return void
  101. */
  102. public function handle()
  103. {
  104. try {
  105. if (empty($this->data['data'])) return;
  106. $deviceId = $this->data['data']['deviceId'];
  107. $dataPoints = $this->data['data']['dataPoints'] ?? [];
  108. if ($this->data['type'] == "dataPoint") {
  109. $redisKey = 'buffer:langfeng_data';
  110. $batchSize = 500;
  111. // 1. 整理数据并推入 Redis 列表
  112. $records = [];
  113. foreach ($dataPoints as $value) {
  114. if (empty($value['value'])) continue;
  115. $records[] = json_encode([
  116. 'machine_code' => $deviceId . $value['dataPointId'],
  117. 'param_value' => floatval($value['value']),
  118. 'time' => $this->getNowDay()
  119. ]);
  120. }
  121. if (!empty($records)) {
  122. // 批量推入 Redis
  123. \Illuminate\Support\Facades\Redis::rpush($redisKey, ...$records);
  124. // 【核心修改点】:每次写入数据都重置 Key 的过期时间为 30 分钟 (1800秒)
  125. // 只要持续有数据进来,这个 Key 就一直有效
  126. // 如果超过 30 分钟没新数据进来(设备关机/下班),Redis 自动删除该 Key
  127. \Illuminate\Support\Facades\Redis::expire($redisKey, 1800);
  128. }
  129. // 2. 检查 Redis 里的总条数
  130. $currentCount = \Illuminate\Support\Facades\Redis::llen($redisKey);
  131. // 3. 达到 500 条,执行批量发送
  132. if ($currentCount >= $batchSize) {
  133. $this->processBatchSend($redisKey, $batchSize);
  134. }
  135. }
  136. } catch (\Exception $exception) {
  137. Log::channel('apiLog')->info('lf开始位置', ["param" => $exception->getMessage()]);
  138. $this->delete();
  139. }
  140. }
  141. /**
  142. * 批量处理发送逻辑
  143. */
  144. protected function processBatchSend($redisKey, $batchSize)
  145. {
  146. // 原子化弹出 500 条数据
  147. $rawRecords = \Illuminate\Support\Facades\Redis::pipeline(function ($pipe) use ($redisKey, $batchSize) {
  148. for ($i = 0; $i < $batchSize; $i++) {
  149. $pipe->lpop($redisKey);
  150. }
  151. });
  152. $batchData = array_map(fn($item) => json_decode($item, true), array_filter($rawRecords));
  153. if (empty($batchData)) return;
  154. list($status,$msg) = $this->sendToDeviceBatch($batchData);
  155. if(! $status) echo $msg;
  156. }
  157. public function sendToDeviceBatch($data){
  158. list($status,$token) = ClearDataService::getTokenLf();
  159. if(! $status) return [false, $token];
  160. $url = $this->url . "api/module-data/hc_device_machine_record/hc_device_machine_record/diy/create_data";
  161. $post = [
  162. 'data' => [
  163. 'device_machine_record' => $data
  164. ]
  165. ];
  166. $header = ["Authorization: Bearer {$token}","Content-Type:application/json","Site:LFMY"];
  167. $curl = curl_init();
  168. curl_setopt_array($curl, array(
  169. CURLOPT_URL => $url,
  170. CURLOPT_RETURNTRANSFER => true,
  171. CURLOPT_ENCODING => '',
  172. CURLOPT_MAXREDIRS => 10,
  173. CURLOPT_TIMEOUT => 15,
  174. CURLOPT_FOLLOWLOCATION => true,
  175. CURLOPT_HTTP_VERSION => CURL_HTTP_VERSION_1_1,
  176. CURLOPT_CUSTOMREQUEST => 'POST',
  177. CURLOPT_POSTFIELDS => json_encode($post),
  178. CURLOPT_SSL_VERIFYPEER => false,
  179. CURLOPT_HTTPHEADER => $header,
  180. ));
  181. $response = curl_exec($curl);
  182. if ($response === false) {
  183. // 获取错误号
  184. $errorNumber = curl_errno($curl);
  185. // 获取错误信息
  186. $errorMessage = curl_error($curl);
  187. $message = "cURL Error #{$errorNumber}: {$errorMessage}";
  188. Log::channel('apiLog')->info('lf写入数据', ["param" => $message]);
  189. }
  190. curl_close($curl);
  191. $res = json_decode($response,true);
  192. if(! isset($res['status'])){//报错了
  193. Log::channel('apiLog')->info('lf写入数据返回错误', ["param" => $response]);
  194. }
  195. return [true, ''];
  196. }
  197. protected function echoMessage(OutputInterface $output)
  198. {
  199. $output->writeln($this->data);
  200. }
  201. function getNowDay(){
  202. // 获取当前时间
  203. $currentDateTime = new \DateTime();
  204. // 设置时区为 PRC
  205. $currentDateTime->setTimezone(new \DateTimeZone('UTC'));
  206. // 格式化时间为 "2023-09-21T16:00:00.000Z" 的格式
  207. $formattedDateTime = $currentDateTime->format('Y-m-d\TH:i:s.000\Z');
  208. return $formattedDateTime;
  209. }
  210. }