ProcessWMSDataJob.php 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323
  1. <?php
  2. namespace App\Jobs;
  3. use App\Model\SyncTempRecord;
  4. use App\Service\U8ThirdPartyService;
  5. use Illuminate\Bus\Queueable;
  6. use Illuminate\Contracts\Queue\ShouldQueue;
  7. use Illuminate\Foundation\Bus\Dispatchable;
  8. use Illuminate\Queue\InteractsWithQueue;
  9. use Illuminate\Queue\SerializesModels;
  10. use Illuminate\Support\Facades\DB;
  11. use Illuminate\Support\Facades\Log;
  12. use Symfony\Component\Console\Output\ConsoleOutput;
  13. use Symfony\Component\Console\Output\OutputInterface;
  14. class ProcessWMSDataJob implements ShouldQueue
  15. {
  16. use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
  17. protected $data;
  18. public $timeout = 60;
  19. public function __construct($data)
  20. {
  21. //record表
  22. $this->data = $data;
  23. }
  24. public function handle()
  25. {
  26. try {
  27. list($bool, $msg) = $this->settle();
  28. if(! $bool) $this->finalDo($msg);
  29. Log::channel('queue_daily')->info('队列执行任务');
  30. } catch (\Throwable $e) {
  31. $this->finalDo("异常:" . $e->getMessage());
  32. Log::channel('queue_daily')->info('队列异常', ['msg' => $e->getMessage() . '|' . $e->getLine()]);
  33. $this->delete();
  34. }
  35. }
  36. private function finalDo($msg){
  37. SyncTempRecord::where('id', $this->data['id'])
  38. ->update(['error_msg' => $msg, 'status' => SyncTempRecord::status_one]);
  39. }
  40. private function settle()
  41. {
  42. $id = $this->data['id'];
  43. $record = SyncTempRecord::where('id', $id)->first();
  44. // 1. 安全检查:如果记录不存在,或者已经是成功状态(status_one),则退出
  45. if (!$record || $record->status != SyncTempRecord::status_zero) {
  46. return [true, '已处理或记录不存在'];
  47. }
  48. // 2. 解析 U8 原始数据
  49. $u8Data = is_array($record->payload) ? $record->payload : json_decode($record->payload, true);
  50. $type = $record->type;
  51. if($type == SyncTempRecord::type_eight){
  52. //检验单
  53. list($status, $msg) = $this->bjOrder($record, $u8Data);
  54. }else{
  55. //同步到音飞
  56. list($status, $msg) = $this->orderInsert($record, $u8Data);
  57. }
  58. return [$status, $msg];
  59. }
  60. public function orderInsert($record, $u8Data){
  61. $orderType = SyncTempRecord::$map[$record->type];
  62. if(is_array($orderType)) $orderType = $orderType[$record->type_2];
  63. // 3. 组装报文
  64. $jsonBody = [
  65. 'orderType' => $orderType,
  66. 'orderNo' => $record->u8_no,
  67. 'orderId' => $record->u8_id,
  68. 'operationType' => SyncTempRecord::$opMapping[$record->op_type],
  69. 'lottar1' => '',
  70. 'lottar2' => '',
  71. 'lottar3' => '',
  72. 'lottar4' => '',
  73. 'lottar5' => '',
  74. 'lottar6' => '',
  75. 'lottar7' => '',
  76. 'lottar8' => '',
  77. 'lottar9' => '',
  78. 'details' => []
  79. ];
  80. // 4. 接口路由
  81. $inboundTypes = [SyncTempRecord::type_one, SyncTempRecord::type_three, SyncTempRecord::type_six, SyncTempRecord::type_five];
  82. $path = in_array($record->type, $inboundTypes) ? '/erp/inbound' : '/erp/outbound';
  83. if($record->type_2 == 1 && $record->type == SyncTempRecord::type_one) $path = '/erp/outbound';
  84. $apiUrl = config('wms.api_url') . $path;
  85. if (isset($u8Data['details']) && is_array($u8Data['details'])) {
  86. if($path == '/erp/inbound'){ //入
  87. foreach ($u8Data['details'] as $item) {
  88. $productDate = "";
  89. if(! empty($item['dMDate'])) $productDate = date("Y-m-d", strtotime($item['dMDate']));
  90. $validTime = 0;
  91. if(! empty($item['iMassDate'])) $validTime = $item['iMassDate'];
  92. $lot = "";
  93. if(! empty($item['cBatch'])) $lot = $item['cBatch'];
  94. $jsonBody['details'][] = [
  95. 'lineNum' => $item['lineNum'] ?? 0,
  96. 'materialCode' => $item['materialCode'] ?? '',
  97. 'planQty' => (float)abs($item['planQty']),
  98. 'productDate' => $productDate,
  99. 'validTime' => $validTime,
  100. 'lot' => $lot,
  101. 'lottar1' => $item['cDefine23'] ?? "",
  102. 'lottar2' => "",
  103. 'lottar3' => '',
  104. 'lottar4' => '',
  105. 'lottar5' => '',
  106. 'lottar6' => '',
  107. 'lottar7' => '',
  108. 'lottar8' => '',
  109. 'lottar9' => '',
  110. ];
  111. }
  112. }else{//出
  113. foreach ($u8Data['details'] as $item) {
  114. $jsonBody['details'][] = [
  115. 'lineNum' => $item['lineNum'] ?? 0,
  116. 'materialCode' => $item['materialCode'] ?? '',
  117. 'planQty' => (float)abs($item['planQty']),
  118. 'lottar1' => "",
  119. 'lottar2' => $item['cBatch'] ?? "",
  120. 'lottar3' => '',
  121. 'lottar4' => '',
  122. 'lottar5' => '',
  123. 'lottar6' => '',
  124. 'lottar7' => '',
  125. 'lottar8' => '',
  126. 'lottar9' => '',
  127. ];
  128. }
  129. }
  130. }
  131. // 5. 调用 post_helper
  132. list($status, $result) = $this->post_helper($apiUrl, $jsonBody, ['Content-Type: application/json']);
  133. if (!$status) {
  134. // 接口返回失败或网络失败
  135. $record->update(['status' => SyncTempRecord::status_two, 'error_msg' => "音飞API返回:" . $result]);
  136. return [false, $result];
  137. }
  138. // 6. 接口返回成功 (200) 的后续操作
  139. DB::transaction(function () use ($record, $u8Data) {
  140. // 更新本地流水状态为成功
  141. $record->update([
  142. 'status' => SyncTempRecord::status_one,
  143. ]);
  144. // 同步维护快照表,保证下次 Command 比对正确
  145. if ($record->op_type == SyncTempRecord::opt_two) {
  146. DB::table('sync_snapshot')
  147. ->where('type', $record->type)
  148. ->where('u8_no', $record->u8_no)
  149. ->where('u8_id', $record->u8_id)
  150. ->delete();
  151. } else {
  152. DB::table('sync_snapshot')->updateOrInsert(
  153. ['type' => $record->type, 'u8_no' => $record->u8_no],
  154. ['u8_id' => $record->u8_id,'last_upd_time' => $record->u8_upd, 'order_date' => $u8Data['order_date'], 'payload' => json_encode($u8Data)]
  155. );
  156. }
  157. });
  158. return [true, ''];
  159. }
  160. //代码是有bug的 因为业务
  161. public function bjOrder($record, $u8Data){
  162. $hg_quantity = $u8Data['hg_quantity'] ?? 0;
  163. $rb_quantity = $u8Data['rb_quantity'] ?? 0;
  164. $hg_not_quantity = $u8Data['hg_not_quantity'] ?? 0;
  165. if($hg_quantity > 0){
  166. list($status, $msg) = $this->zj($record, $u8Data, 1);
  167. }elseif ($rb_quantity > 0) {
  168. list($status, $msg) = $this->zj($record, $u8Data,3);
  169. }elseif($hg_not_quantity > 0){
  170. list($status, $msg) = $this->zj($record, $u8Data,2);
  171. }
  172. return [$status ?? true, $msg ?? ""];
  173. }
  174. public function zj($record, $u8Data, $type){
  175. if($record['type_2'] == 1){
  176. $orderNo = $u8Data['from_order'];
  177. }else{
  178. $orderNo = $u8Data['bj_order'];
  179. }
  180. // 3. 组装报文
  181. $jsonBody = [
  182. 'orderNo' => $orderNo,
  183. 'materialCode' => $u8Data['materialCode'],
  184. 'lot' => $u8Data['lot'] ?? "",
  185. 'inspectionStatus' => $type,
  186. ];
  187. // 4. 接口路由
  188. $apiUrl = config('wms.api_url') . '/erp/inspection';
  189. // 5. 调用 post_helper
  190. list($status, $result) = $this->post_helper($apiUrl, $jsonBody, ['Content-Type: application/json']);
  191. if (!$status) {
  192. // 接口返回失败或网络失败
  193. $record->update(['status' => SyncTempRecord::status_two, 'error_msg' => "音飞API返回:" . $result]);
  194. return [false, $result];
  195. }
  196. //生成用友单据
  197. $service = new U8ThirdPartyService();
  198. //报检单类型
  199. $bg_type = $record['type_2'];
  200. if($bg_type == 1){
  201. //采购
  202. if($type == 1 || $type == 3){
  203. //合格 让步 =》 采购入库
  204. list($status, $msg) = $service->purchaseInByZj(['record' => $record, 'payload' => $u8Data,'cvouchtype' => 'QM03','type' => $type]);
  205. }else{
  206. //不合格 =》其他入库单
  207. list($status, $msg) = $service->otherInByZj(['record' => $record, 'payload' => $u8Data, 'cvouchtype' => 'QM03']);
  208. }
  209. }elseif ($bg_type == 2){
  210. //产品
  211. if($type == 1 || $type == 3){
  212. //合格 让步 =》 产成品入库
  213. list($status, $msg) = $service->productInByZj(['record' => $record, 'payload' => $u8Data, 'cvouchtype' => 'QM04','type' => $type]);
  214. }else{
  215. //不合格 =》其他入库单
  216. list($status, $msg) = $service->otherInByZj(['record' => $record, 'payload' => $u8Data, 'cvouchtype' => 'QM04']);
  217. }
  218. }else{
  219. //退货 不需要了
  220. list($status, $msg) = [true, ''];
  221. }
  222. if (!$status) {
  223. // 接口返回失败或网络失败
  224. $record->update(['status' => SyncTempRecord::status_two, 'error_msg' => $msg]);
  225. return [false, $msg];
  226. }
  227. // 6. 接口返回成功 (200) 的后续操作
  228. DB::transaction(function () use ($record, $u8Data) {
  229. // 更新本地流水状态为成功
  230. $record->update([
  231. 'status' => SyncTempRecord::status_one,
  232. ]);
  233. // 同步维护快照表,保证下次 Command 比对正确
  234. if ($record->op_type == SyncTempRecord::opt_two) {
  235. DB::table('sync_snapshot')
  236. ->where('type', $record->type)
  237. ->where('u8_no', $record->u8_no)
  238. ->where('u8_id', $record->u8_id)
  239. ->delete();
  240. } else {
  241. DB::table('sync_snapshot')->updateOrInsert(
  242. ['type' => $record->type, 'u8_no' => $record->u8_no],
  243. ['u8_id' => $record->u8_id,'last_upd_time' => $record->u8_upd, 'order_date' => $u8Data['order_date'], 'payload' => json_encode($u8Data)]
  244. );
  245. }
  246. });
  247. return [true, ''];
  248. }
  249. public function post_helper($url, $data, $header = [], $timeout = 30)
  250. {
  251. Log::channel('apiLog')->info('入参', ["api" => $url, "param" => $data]);
  252. $ch = curl_init();
  253. curl_setopt($ch, CURLOPT_URL, $url);
  254. curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
  255. curl_setopt($ch, CURLOPT_POST, 1);
  256. curl_setopt($ch, CURLOPT_HTTPHEADER, $header);
  257. curl_setopt($ch, CURLOPT_SSL_VERIFYPEER, false);
  258. curl_setopt($ch, CURLOPT_TIMEOUT, $timeout);
  259. if(!is_null($data)) curl_setopt($ch, CURLOPT_POSTFIELDS, json_encode($data));
  260. $r = curl_exec($ch);
  261. if ($r === false) {
  262. $errorMessage = curl_error($ch);
  263. curl_close($ch);
  264. Log::channel('apiLog')->error('CURL错误', ["msg" => $errorMessage]);
  265. return [false, "网络错误: " . $errorMessage];
  266. }
  267. curl_close($ch);
  268. $return = json_decode($r, true);
  269. Log::channel('apiLog')->info('出参', ["res" => $return]);
  270. // 判断逻辑:code 存在且为 200 才算 true
  271. if (isset($return['code']) && $return['code'] == 200) {
  272. return [true, $return];
  273. }
  274. $msg = $return['message'] ?? '未知接口错误';
  275. return [false, $msg];
  276. }
  277. protected function echoMessage(OutputInterface $output)
  278. {
  279. //输出消息
  280. $output->writeln(json_encode($this->data));
  281. }
  282. }