ProcessDataJob.php 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156
  1. <?php
  2. namespace App\Jobs;
  3. use App\Model\DispatchSub;
  4. use App\Model\ErrorTable;
  5. use App\Service\FinishedOrderService;
  6. use Illuminate\Bus\Queueable;
  7. use Illuminate\Contracts\Queue\ShouldQueue;
  8. use Illuminate\Foundation\Bus\Dispatchable;
  9. use Illuminate\Queue\InteractsWithQueue;
  10. use Illuminate\Queue\SerializesModels;
  11. use Illuminate\Support\Facades\DB;
  12. use Illuminate\Support\Facades\Redis;
  13. use MongoDB\Driver\Exception\Exception;
  14. use Symfony\Component\Console\Output\ConsoleOutput;
  15. use Symfony\Component\Console\Output\OutputInterface;
  16. class ProcessDataJob implements ShouldQueue
  17. {
  18. use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
  19. const job_one = 'finished_operation';
  20. const job_two = '';
  21. protected $data;
  22. protected $user;
  23. protected $type;
  24. //1 代表产成品入库 2 销售出库单
  25. protected $function = [
  26. 1 => 'U8Rdrecord10Save',
  27. 2 => 'U8Rdrecord32Save'
  28. ];
  29. protected $function_reback = [
  30. 1 => 'reBackOne',
  31. ];
  32. protected $jobs = [
  33. 1 => self::job_one,
  34. 2 => self::job_two
  35. ];
  36. /**
  37. * Create a new job instance.
  38. * $data = [
  39. 'result' => $msg, //查询结果
  40. 'data' => $data, //用户提交的参数
  41. ];
  42. * $user 提交用户的信息
  43. * $type //1 代表产成品入库(完工操作) 2 销售出库单(包装单扫描出库)
  44. *
  45. * @return void
  46. */
  47. public function __construct($data, $user = [], $type)
  48. {
  49. $this->data = $data;
  50. $this->user = $user ?? [];
  51. $this->type = $type;
  52. }
  53. /** file_put_contents('charge.txt',"标记位置退出".PHP_EOL,8);
  54. * Execute the job.
  55. *
  56. * @return void
  57. */
  58. public function handle()
  59. {
  60. try {
  61. $function = $this->function[$this->type] ?? '';//调用方法 入库
  62. $function_back = $this->function_reback[$this->type] ?? '';//数据回退方法
  63. if(empty($function)) return;
  64. list($status,$msg) = $this->$function();
  65. if(! $status) $this->errorSettle($msg);
  66. } catch (\Exception $e) {
  67. $this->$function_back();
  68. $this->recordErrorTable($e->getFile() . $e->getMessage() . $e->getLine());
  69. }
  70. //输出信息
  71. $this->echoMessage(new ConsoleOutput());
  72. }
  73. protected function echoMessage(OutputInterface $output)
  74. {
  75. //输出消息
  76. $output->writeln(json_encode($this->data));
  77. }
  78. //产成品入库
  79. private function U8Rdrecord10Save(){
  80. $service = new FinishedOrderService();
  81. //标记
  82. list($status,$msg) = $service->addInJob($this->data['result'],$this->data['data']);
  83. return [$status,$msg];
  84. }
  85. private function reBackOne(){
  86. //数据回退
  87. $data = $this->data['data'];
  88. //进入队列的数据
  89. DispatchSub::whereIn('id',$data['id'])->update([
  90. 'job_status' => 0,
  91. ]);
  92. }
  93. //销售单出库
  94. private function U8Rdrecord32Save(){
  95. }
  96. private function errorSettle($msg){
  97. $redis = Redis::connection();
  98. $order_failures_key = md5(json_encode($this->data));
  99. // 从Redis中获取失败计数
  100. $failureCount = $redis->hIncrBy('order_failures', $order_failures_key, 1);
  101. //队列名
  102. $job = $this->jobs[$this->type];
  103. if ($failureCount < 2) {
  104. // 将任务重新放回队列
  105. self::dispatch($this->data,$this->user,$this->type)->onQueue($job)->delay(now()->addSeconds(2));
  106. } else {
  107. // 删除失败计数
  108. $redis->hDel('order_failures', $order_failures_key);
  109. //数据回退
  110. $function_back = $this->function_reback[$this->type] ?? '';
  111. $this->$function_back();
  112. //记录错误
  113. $this->recordErrorTable($msg);
  114. }
  115. }
  116. private function recordErrorTable($msg){
  117. $model = new ErrorTable();
  118. $model->msg = $msg;
  119. $model->data = json_encode($this->data);
  120. $model->user_data = json_encode($this->user);
  121. $model->save();
  122. }
  123. public function failed($exception)
  124. {
  125. // 记录失败错误信息到日志或其他媒介
  126. $errorMessage = $exception->getFile() . $exception->getMessage() . $exception->getLine();
  127. $this->recordErrorTable($errorMessage);
  128. }
  129. }