ManDeviceJobJLWM.php 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
  1. <?php
  2. namespace App\Jobs;
  3. use App\Service\ClearDataService;
  4. use Illuminate\Bus\Queueable;
  5. use Illuminate\Contracts\Queue\ShouldQueue;
  6. use Illuminate\Foundation\Bus\Dispatchable;
  7. use Illuminate\Queue\InteractsWithQueue;
  8. use Illuminate\Queue\SerializesModels;
  9. use Illuminate\Support\Facades\Log;
  10. use Symfony\Component\Console\Output\OutputInterface;
  11. class ManDeviceJobJLWM implements ShouldQueue
  12. {
  13. use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
  14. protected $data;
  15. protected $url;
  16. /**
  17. * Create a new job instance.
  18. *
  19. * @return void
  20. */
  21. public function __construct($data)
  22. {
  23. $this->data = $data;
  24. $this->url = config('ip.zslf');
  25. }
  26. /**
  27. * Execute the job.
  28. *
  29. * @return void
  30. */
  31. public function handle()
  32. {
  33. try {
  34. if (empty($this->data['data'])) return;
  35. $deviceId = $this->data['data']['deviceId'];
  36. $dataPoints = $this->data['data']['dataPoints'] ?? [];
  37. if ($this->data['type'] == "dataPoint") {
  38. $redisKey = 'buffer:jialiwumei_data';
  39. $batchSize = 500;
  40. // 1. 整理数据并推入 Redis 列表
  41. $records = [];
  42. foreach ($dataPoints as $value) {
  43. if (empty($value['value'])) continue;
  44. $records[] = json_encode([
  45. 'machine_code' => $deviceId . $value['dataPointId'],
  46. 'param_value' => floatval($value['value']),
  47. 'time' => $this->getNowDay()
  48. ]);
  49. }
  50. if (!empty($records)) {
  51. // 批量推入 Redis
  52. \Illuminate\Support\Facades\Redis::rpush($redisKey, ...$records);
  53. // 【核心修改点】:每次写入数据都重置 Key 的过期时间为 30 分钟 (1800秒)
  54. // 只要持续有数据进来,这个 Key 就一直有效
  55. // 如果超过 30 分钟没新数据进来(设备关机/下班),Redis 自动删除该 Key
  56. \Illuminate\Support\Facades\Redis::expire($redisKey, 1800);
  57. }
  58. // 2. 检查 Redis 里的总条数
  59. $currentCount = \Illuminate\Support\Facades\Redis::llen($redisKey);
  60. // 3. 达到 500 条,执行批量发送
  61. if ($currentCount >= $batchSize) {
  62. $this->processBatchSend($redisKey, $batchSize);
  63. }
  64. }
  65. } catch (\Exception $exception) {
  66. Log::channel('apiLog')->info('jlwm开始位置', ["param" => $exception->getMessage()]);
  67. $this->delete();
  68. }
  69. }
  70. /**
  71. * 批量处理发送逻辑
  72. */
  73. protected function processBatchSend($redisKey, $batchSize)
  74. {
  75. // 原子化弹出 500 条数据
  76. $rawRecords = \Illuminate\Support\Facades\Redis::pipeline(function ($pipe) use ($redisKey, $batchSize) {
  77. for ($i = 0; $i < $batchSize; $i++) {
  78. $pipe->lpop($redisKey);
  79. }
  80. });
  81. $batchData = array_values(array_map(fn($item) => json_decode($item, true), array_filter($rawRecords)));
  82. if (empty($batchData)) return;
  83. list($status,$msg) = $this->sendToDeviceBatch($batchData);
  84. if(! $status) echo $msg;
  85. }
  86. public function sendToDeviceBatch($data){
  87. list($status,$token) = ClearDataService::getTokenJLWM();
  88. if(! $status) return [false, $token];
  89. $url = $this->url . "api/module-data/jlwm_device_machine_record/jlwm_device_machine_record/diy/create_data";
  90. $post = [
  91. 'data' => [
  92. 'device_machine_record' => $data
  93. ]
  94. ];
  95. $header = ["Authorization: Bearer {$token}","Content-Type:application/json","Site:JLWM"];
  96. $curl = curl_init();
  97. curl_setopt_array($curl, array(
  98. CURLOPT_URL => $url,
  99. CURLOPT_RETURNTRANSFER => true,
  100. CURLOPT_ENCODING => '',
  101. CURLOPT_MAXREDIRS => 10,
  102. CURLOPT_TIMEOUT => 15,
  103. CURLOPT_FOLLOWLOCATION => true,
  104. CURLOPT_HTTP_VERSION => CURL_HTTP_VERSION_1_1,
  105. CURLOPT_CUSTOMREQUEST => 'POST',
  106. CURLOPT_POSTFIELDS => json_encode($post),
  107. CURLOPT_SSL_VERIFYPEER => false,
  108. CURLOPT_HTTPHEADER => $header,
  109. ));
  110. $response = curl_exec($curl);
  111. if ($response === false) {
  112. // 获取错误号
  113. $errorNumber = curl_errno($curl);
  114. // 获取错误信息
  115. $errorMessage = curl_error($curl);
  116. $message = "cURL Error #{$errorNumber}: {$errorMessage}";
  117. Log::channel('apiLog')->info('jlwm写入数据', ["param" => $message]);
  118. }
  119. curl_close($curl);
  120. $res = json_decode($response,true);
  121. if(! isset($res['status'])){//报错了
  122. Log::channel('apiLog')->info('jlwm写入数据返回错误', ["param" => $response]);
  123. }
  124. return [true, ''];
  125. }
  126. protected function echoMessage(OutputInterface $output)
  127. {
  128. $output->writeln($this->data);
  129. }
  130. function getNowDay(){
  131. // 获取当前时间
  132. $currentDateTime = new \DateTime();
  133. // 设置时区为 PRC
  134. $currentDateTime->setTimezone(new \DateTimeZone('UTC'));
  135. // 格式化时间为 "2023-09-21T16:00:00.000Z" 的格式
  136. $formattedDateTime = $currentDateTime->format('Y-m-d\TH:i:s.000\Z');
  137. return $formattedDateTime;
  138. }
  139. }