ManDeviceJobHc.php 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212
  1. <?php
  2. namespace App\Jobs;
  3. use App\Service\ClearDataService;
  4. use Illuminate\Bus\Queueable;
  5. use Illuminate\Contracts\Queue\ShouldQueue;
  6. use Illuminate\Foundation\Bus\Dispatchable;
  7. use Illuminate\Queue\InteractsWithQueue;
  8. use Illuminate\Queue\SerializesModels;
  9. use Illuminate\Support\Facades\Log;
  10. use Symfony\Component\Console\Output\OutputInterface;
  11. class ManDeviceJobHc implements ShouldQueue
  12. {
  13. use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
  14. protected $data;
  15. protected $url;
  16. /**
  17. * Create a new job instance.
  18. *
  19. * @return void
  20. */
  21. public function __construct($data)
  22. {
  23. $this->data = $data;
  24. $this->url = config('ip.zslf');
  25. }
  26. public function handle()
  27. {
  28. try {
  29. if (empty($this->data['data'])) return;
  30. $deviceId = $this->data['data']['deviceId'];
  31. $dataPoints = $this->data['data']['dataPoints'] ?? [];
  32. if ($this->data['type'] == "dataPoint") {
  33. $redisKey = 'buffer:hengchang_data';
  34. $batchSize = 500;
  35. // 1. 整理数据并推入 Redis 列表
  36. $records = [];
  37. foreach ($dataPoints as $value) {
  38. if (empty($value['value'])) continue;
  39. $records[] = json_encode([
  40. 'machine_code' => $deviceId . $value['dataPointId'],
  41. 'param_value' => floatval($value['value']),
  42. 'time' => $this->getNowDay()
  43. ]);
  44. }
  45. if (!empty($records)) {
  46. // 批量推入 Redis
  47. \Illuminate\Support\Facades\Redis::rpush($redisKey, ...$records);
  48. // 【核心修改点】:每次写入数据都重置 Key 的过期时间为 30 分钟 (1800秒)
  49. // 只要持续有数据进来,这个 Key 就一直有效
  50. // 如果超过 30 分钟没新数据进来(设备关机/下班),Redis 自动删除该 Key
  51. \Illuminate\Support\Facades\Redis::expire($redisKey, 1800);
  52. }
  53. // 2. 检查 Redis 里的总条数
  54. $currentCount = \Illuminate\Support\Facades\Redis::llen($redisKey);
  55. // 3. 达到 500 条,执行批量发送
  56. if ($currentCount >= $batchSize) {
  57. $this->processBatchSend($redisKey, $batchSize);
  58. }
  59. }
  60. } catch (\Exception $exception) {
  61. Log::channel('apiLog')->info('hc开始位置异常', ["param" => $exception->getMessage()]);
  62. // 发生异常时可以根据需要选择是否 delete 或 release
  63. $this->delete();
  64. }
  65. }
  66. /**
  67. * Execute the job.
  68. *
  69. * @return void
  70. */
  71. // public function handle()
  72. // {
  73. // try {
  74. // if (empty($this->data['data'])) return;
  75. //
  76. // $deviceId = $this->data['data']['deviceId'];
  77. // $dataPoints = $this->data['data']['dataPoints'] ?? [];
  78. //
  79. // if ($this->data['type'] == "dataPoint") {
  80. // $redisKey = 'buffer:hengchang_data';
  81. // $batchSize = 500;
  82. //
  83. // // 1. 整理数据并推入 Redis 列表
  84. // $records = [];
  85. // foreach ($dataPoints as $value) {
  86. // if (empty($value['value'])) continue;
  87. //
  88. // $records[] = json_encode([
  89. // 'machine_code' => $deviceId . $value['dataPointId'],
  90. // 'param_value' => floatval($value['value']),
  91. // 'time' => $this->getNowDay()
  92. // ]);
  93. // }
  94. //
  95. // if (!empty($records)) {
  96. // // 批量推入 Redis
  97. // \Illuminate\Support\Facades\Redis::rpush($redisKey, ...$records);
  98. // }
  99. //
  100. // // 2. 检查 Redis 里的总条数
  101. // $currentCount = \Illuminate\Support\Facades\Redis::llen($redisKey);
  102. //
  103. // // 3. 达到 500 条,执行批量发送
  104. //
  105. // if ($currentCount >= $batchSize) {
  106. // $this->processBatchSend($redisKey, $batchSize);
  107. // }
  108. // }
  109. // } catch (\Exception $exception) {
  110. // Log::channel('apiLog')->info('hc开始位置', ["param" => $exception->getMessage()]);
  111. // $this->delete();
  112. // }
  113. // }
  114. /**
  115. * 批量处理发送逻辑
  116. */
  117. protected function processBatchSend($redisKey, $batchSize)
  118. {
  119. // 原子化弹出 500 条数据
  120. $rawRecords = \Illuminate\Support\Facades\Redis::pipeline(function ($pipe) use ($redisKey, $batchSize) {
  121. for ($i = 0; $i < $batchSize; $i++) {
  122. $pipe->lpop($redisKey);
  123. }
  124. });
  125. $batchData = array_map(fn($item) => json_decode($item, true), array_filter($rawRecords));
  126. if (empty($batchData)) return;
  127. list($status,$msg) = $this->sendToDeviceBatch($batchData);
  128. if(! $status) echo $msg;
  129. }
  130. public function sendToDeviceBatch($data){
  131. list($status,$token) = ClearDataService::getTokenHc();
  132. if(! $status) return [false, $token];
  133. $url = $this->url . "api/module-data/hc_device_machine_record/hc_device_machine_record/diy/create_data";
  134. $post = [
  135. 'data' => [
  136. 'device_machine_record' => $data
  137. ]
  138. ];
  139. $header = ["Authorization: Bearer {$token}","Content-Type:application/json","Site:HCLT"];
  140. $curl = curl_init();
  141. curl_setopt_array($curl, array(
  142. CURLOPT_URL => $url,
  143. CURLOPT_RETURNTRANSFER => true,
  144. CURLOPT_ENCODING => '',
  145. CURLOPT_MAXREDIRS => 10,
  146. CURLOPT_TIMEOUT => 15,
  147. CURLOPT_FOLLOWLOCATION => true,
  148. CURLOPT_HTTP_VERSION => CURL_HTTP_VERSION_1_1,
  149. CURLOPT_CUSTOMREQUEST => 'POST',
  150. CURLOPT_POSTFIELDS => json_encode($post),
  151. CURLOPT_SSL_VERIFYPEER => false,
  152. CURLOPT_HTTPHEADER => $header,
  153. ));
  154. $response = curl_exec($curl);
  155. if ($response === false) {
  156. // 获取错误号
  157. $errorNumber = curl_errno($curl);
  158. // 获取错误信息
  159. $errorMessage = curl_error($curl);
  160. $message = "cURL Error #{$errorNumber}: {$errorMessage}";
  161. Log::channel('apiLog')->info('hc写入数据', ["param" => $message]);
  162. }
  163. curl_close($curl);
  164. $res = json_decode($response,true);
  165. if(! isset($res['status'])){//报错了
  166. Log::channel('apiLog')->info('hc写入数据返回错误', ["param" => $response]);
  167. }
  168. return [true, ''];
  169. }
  170. protected function echoMessage(OutputInterface $output)
  171. {
  172. $output->writeln($this->data);
  173. }
  174. function getNowDay(){
  175. // 获取当前时间
  176. $currentDateTime = new \DateTime();
  177. // 设置时区为 PRC
  178. $currentDateTime->setTimezone(new \DateTimeZone('UTC'));
  179. // 格式化时间为 "2023-09-21T16:00:00.000Z" 的格式
  180. $formattedDateTime = $currentDateTime->format('Y-m-d\TH:i:s.000\Z');
  181. return $formattedDateTime;
  182. }
  183. }