ProcessDataJob.php 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220
  1. <?php
  2. namespace App\Jobs;
  3. use App\Model\Box;
  4. use App\Model\DispatchSub;
  5. use App\Model\ErrorTable;
  6. use App\Service\FinishedOrderService;
  7. use App\Service\FyyOrderService;
  8. use Illuminate\Bus\Queueable;
  9. use Illuminate\Contracts\Queue\ShouldQueue;
  10. use Illuminate\Foundation\Bus\Dispatchable;
  11. use Illuminate\Queue\InteractsWithQueue;
  12. use Illuminate\Queue\SerializesModels;
  13. use Illuminate\Support\Facades\DB;
  14. use Illuminate\Support\Facades\Redis;
  15. use MongoDB\Driver\Exception\Exception;
  16. use Symfony\Component\Console\Output\ConsoleOutput;
  17. use Symfony\Component\Console\Output\OutputInterface;
  18. class ProcessDataJob implements ShouldQueue
  19. {
  20. use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
  21. //队列名称
  22. const job_one = 'hc_finished_operation';//完工
  23. const job_two = 'hc_box_operation';//包装
  24. protected $data;
  25. protected $user;
  26. protected $type;
  27. public $tries = 0;
  28. public $timeout = 60;
  29. //1 代表产成品入库 2 销售出库单 3 代表产成品入库手机端 4 销售出库单
  30. protected $function = [
  31. 1 => 'U8Rdrecord10Save',
  32. 2 => 'U8Rdrecord32Save',
  33. 3 => 'U8Rdrecord10SaveMobile'
  34. ];
  35. //数据回退 标记了的单据数据状态改为0
  36. protected $function_reback = [
  37. 1 => 'reBackOne',
  38. 2 => 'reBackTwo',
  39. 3 => 'reBackThree',
  40. ];
  41. protected $jobs = [
  42. 1 => self::job_one,
  43. 2 => self::job_two,
  44. 3 => self::job_one,
  45. ];
  46. /**
  47. * Create a new job instance.
  48. * $data = [
  49. 'result' => $msg, //查询结果
  50. 'data' => $data, //用户提交的参数
  51. ];
  52. * $user 提交用户的信息
  53. * $type //1 代表产成品入库(完工操作) 2 销售出库单(包装单扫描出库)
  54. *
  55. * @return void
  56. */
  57. public function __construct($data, $user = [], $type)
  58. {
  59. $this->data = $data;
  60. $this->user = $user ?? [];
  61. $this->type = $type;
  62. }
  63. /**
  64. *
  65. * file_put_contents('charge.txt',"标记位置退出".PHP_EOL,8);
  66. * Execute the job.
  67. *
  68. * @return void
  69. */
  70. public function handle()
  71. {
  72. try {
  73. $function = $this->function[$this->type] ?? '';//调用方法 入库
  74. $function_back = $this->function_reback[$this->type] ?? '';//数据回退方法
  75. if(empty($function)) return;
  76. list($status,$msg) = $this->$function();
  77. if(! $status) $this->errorSettle($msg);
  78. } catch (\Exception $e) {
  79. $this->$function_back();
  80. $this->recordErrorTable($e->getFile() . $e->getMessage() . $e->getLine());
  81. }
  82. //输出信息
  83. $this->echoMessage(new ConsoleOutput());
  84. }
  85. protected function echoMessage(OutputInterface $output)
  86. {
  87. //输出消息
  88. $output->writeln(json_encode($this->data));
  89. }
  90. //产成品入库
  91. private function U8Rdrecord10Save(){
  92. $service = new FinishedOrderService();
  93. //标记
  94. list($status,$msg) = $service->addInJob($this->data['result'],$this->data['data'],$this->user);
  95. return [$status,$msg];
  96. }
  97. //产成品入库手机端
  98. private function U8Rdrecord10SaveMobile(){
  99. $service = new FinishedOrderService();
  100. //标记
  101. list($status,$msg) = $service->addMobileInJob($this->data['result'],$this->data['data'],$this->data['quantity_count'],$this->user);
  102. return [$status,$msg];
  103. }
  104. //产成品入库 相关数据回退
  105. private function reBackOne(){
  106. //数据回退
  107. $data = $this->data['data'];
  108. $database = $this->settleDatabase();
  109. // 连接到指定数据库连接
  110. DB::connection($database)->table('dispatch_sub')
  111. ->whereIn('id',$data['id'])
  112. ->update(['job_status' => 0]);
  113. }
  114. //产成品入库 相关数据回退
  115. private function reBackThree(){
  116. //数据回退
  117. $data = $this->data['data'];
  118. $database = $this->settleDatabase();
  119. // 连接到指定数据库连接
  120. DB::connection($database)->table('dispatch_sub')
  121. ->whereIn('id',array_column($data,'id'))
  122. ->update(['job_status' => 0]);
  123. }
  124. //销售单出库
  125. private function U8Rdrecord32Save(){
  126. $service = new FyyOrderService();
  127. list($status,$msg) = $service->addInJob($this->data['result'],$this->data['data'],$this->user);
  128. return [$status,$msg];
  129. }
  130. //销售单出库 相关数据回退
  131. private function reBackTwo(){
  132. //数据回退
  133. $data = $this->data['data'];
  134. $database = $this->settleDatabase();
  135. // 连接到指定数据库连接
  136. DB::connection($database)->table('box')
  137. ->whereIn('order_no',$data['order_no'])
  138. ->update(['state' => 0]);
  139. }
  140. private function errorSettle($msg){
  141. $redis = Redis::connection();
  142. $order_failures_key = md5(json_encode($this->data));
  143. // 从Redis中获取失败计数
  144. $failureCount = $redis->hIncrBy('order_failures', $order_failures_key, 1);
  145. //队列名
  146. $job = $this->jobs[$this->type];
  147. if ($failureCount < 1) {
  148. // 将任务重新放回队列
  149. self::dispatch($this->data,$this->user,$this->type)->onQueue($job)->delay(now()->addSeconds(2));
  150. } else {
  151. // 删除失败计数
  152. $redis->hDel('order_failures', $order_failures_key);
  153. //数据回退
  154. $function_back = $this->function_reback[$this->type] ?? '';
  155. $this->$function_back();
  156. //记录错误
  157. $this->recordErrorTable($msg);
  158. }
  159. }
  160. private function recordErrorTable($msg){
  161. $database = $this->settleDatabase();
  162. // 连接到指定数据库连接
  163. DB::connection($database)->table('error_table')->insert([
  164. 'msg' => $msg,
  165. 'data' => json_encode($this->data),
  166. 'user_id' => $this->user['id'],
  167. 'user_operation_time' => $this->user['operate_time'],
  168. 'type' => $this->type
  169. ]);
  170. }
  171. public function failed($exception)
  172. {
  173. // 记录失败错误信息到日志或其他媒介
  174. $errorMessage = $exception->getFile() . $exception->getMessage() . $exception->getLine();
  175. $this->recordErrorTable($errorMessage);
  176. }
  177. public function settleDatabase(){
  178. $zt = $this->user['zt'] ?? '';
  179. return (new FinishedOrderService())->getConnectionName($zt);
  180. }
  181. }