ProcessWMSArchiveDataJob.php 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. <?php
  2. namespace App\Jobs;
  3. use App\Model\SyncTempRecordProduct;
  4. use Illuminate\Bus\Queueable;
  5. use Illuminate\Contracts\Queue\ShouldQueue;
  6. use Illuminate\Foundation\Bus\Dispatchable;
  7. use Illuminate\Queue\InteractsWithQueue;
  8. use Illuminate\Queue\SerializesModels;
  9. use Illuminate\Support\Facades\DB;
  10. use Illuminate\Support\Facades\Log;
  11. use Symfony\Component\Console\Output\ConsoleOutput;
  12. use Symfony\Component\Console\Output\OutputInterface;
  13. class ProcessWMSArchiveDataJob implements ShouldQueue
  14. {
  15. use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
  16. protected $data;
  17. public $timeout = 30;
  18. public function __construct($data)
  19. {
  20. //record表
  21. $this->data = $data;
  22. }
  23. public function handle()
  24. {
  25. try {
  26. list($bool, $msg) = $this->settle();
  27. if(! $bool) $this->finalDo($msg);
  28. } catch (\Throwable $e) {
  29. $this->finalDo("异常:" . $e->getMessage());
  30. $this->delete();
  31. }
  32. }
  33. private function finalDo($msg){
  34. SyncTempRecordProduct::where('id', $this->data['id'])
  35. ->update(['error_msg' => $msg, 'status' => SyncTempRecordProduct::status_two]);
  36. }
  37. private function settle()
  38. {
  39. $id = $this->data['id'];
  40. $record = SyncTempRecordProduct::where('id', $id)->first();
  41. // 1. 安全检查:如果记录不存在,或者已经是成功状态(status_one),则退出
  42. if (!$record || $record->status != SyncTempRecordProduct::status_zero) {
  43. return [true, '已处理或记录不存在'];
  44. }
  45. // 2. 解析 U8 原始数据
  46. $u8Data = is_array($record->payload) ? $record->payload : json_decode($record->payload, true);
  47. $jsonBody = [
  48. 'operationType' => SyncTempRecordProduct::$opMapping[$record->op_type],
  49. 'materialCode' => $u8Data['product_code'],
  50. 'materialName' => $u8Data['product_name'],
  51. 'materialSpec' => $u8Data['product_size'] ?? '',
  52. 'materialTypeCode' => $u8Data['product_category_code'],
  53. 'materialTypeName' => $u8Data['product_category_name'],
  54. 'unitCode' => $u8Data['product_unit_title'],
  55. 'netWeight' => 0,
  56. 'grossWeight' => $u8Data['grossWeight'] ?? 0,
  57. ];
  58. // 4. 接口路由
  59. $path = '/erp/material';
  60. $apiUrl = config('wms.api_url') . $path;
  61. // 5. 调用
  62. list($status, $result) = $this->post_helper($apiUrl, $jsonBody, ['Content-Type: application/json']);
  63. if (! $status) {
  64. // 接口返回失败或网络失败
  65. $record->update(['status' => SyncTempRecordProduct::status_two, 'error_msg' => $result]);
  66. return [false, $result];
  67. }
  68. // 6. 接口返回成功 (200) 的后续操作
  69. DB::transaction(function () use ($record, $u8Data) {
  70. // 更新本地流水状态为成功
  71. $record->update([
  72. 'status' => SyncTempRecordProduct::status_one,
  73. ]);
  74. // 同步维护快照表,保证下次 Command 比对正确
  75. if ($record->op_type == SyncTempRecordProduct::opt_two) {
  76. DB::table('sync_snapshot_product')
  77. ->where('code', $u8Data['product_code'])
  78. ->where('ufts', $u8Data['ufts'])
  79. ->delete();
  80. } else {
  81. DB::table('sync_snapshot_product')->updateOrInsert(
  82. ['code' => $u8Data['product_code']],
  83. ['code' => $u8Data['product_code'], 'ufts' => $u8Data['ufts'], 'payload' => json_encode($u8Data)]
  84. );
  85. }
  86. });
  87. return [true, ''];
  88. }
  89. public function post_helper($url, $data, $header = [], $timeout = 30)
  90. {
  91. Log::channel('apiLog')->info('存货同步', ["api" => $url, "param" => $data]);
  92. $ch = curl_init();
  93. curl_setopt($ch, CURLOPT_URL, $url);
  94. curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
  95. curl_setopt($ch, CURLOPT_POST, 1);
  96. curl_setopt($ch, CURLOPT_HTTPHEADER, $header);
  97. curl_setopt($ch, CURLOPT_SSL_VERIFYPEER, false);
  98. curl_setopt($ch, CURLOPT_TIMEOUT, $timeout);
  99. if(!is_null($data)) curl_setopt($ch, CURLOPT_POSTFIELDS, json_encode($data));
  100. $r = curl_exec($ch);
  101. if ($r === false) {
  102. $errorMessage = curl_error($ch);
  103. curl_close($ch);
  104. Log::channel('apiLog')->error('存货同步异常:WMS_CURL_ERROR', ["msg" => $errorMessage]);
  105. return [false, "网络错误: " . $errorMessage];
  106. }
  107. curl_close($ch);
  108. $return = json_decode($r, true);
  109. Log::channel('apiLog')->info('存货同步返回结果', ["res" => $return]);
  110. // 判断逻辑:code 存在且为 200 才算 true
  111. if (isset($return['code']) && $return['code'] == 200) {
  112. return [true, $return];
  113. }
  114. $msg = $return['message'] ?? '未知接口错误';
  115. return [false, $msg];
  116. }
  117. protected function echoMessage(OutputInterface $output)
  118. {
  119. //输出消息
  120. $output->writeln(json_encode($this->data));
  121. }
  122. }