ProcessWMSDataJob.php 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338
  1. <?php
  2. namespace App\Jobs;
  3. use App\Model\SyncTempRecord;
  4. use App\Service\U8ThirdPartyService;
  5. use Illuminate\Bus\Queueable;
  6. use Illuminate\Contracts\Queue\ShouldQueue;
  7. use Illuminate\Foundation\Bus\Dispatchable;
  8. use Illuminate\Queue\InteractsWithQueue;
  9. use Illuminate\Queue\SerializesModels;
  10. use Illuminate\Support\Facades\DB;
  11. use Illuminate\Support\Facades\Log;
  12. use Symfony\Component\Console\Output\ConsoleOutput;
  13. use Symfony\Component\Console\Output\OutputInterface;
  14. class ProcessWMSDataJob implements ShouldQueue
  15. {
  16. use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
  17. protected $data;
  18. public $timeout = 60;
  19. public function __construct($data)
  20. {
  21. //record表
  22. $this->data = $data;
  23. }
  24. public function handle()
  25. {
  26. try {
  27. list($bool, $msg) = $this->settle();
  28. if(! $bool) $this->finalDo($msg);
  29. Log::channel('queue_daily')->info('队列执行任务');
  30. } catch (\Throwable $e) {
  31. $this->finalDo("异常:" . $e->getMessage());
  32. Log::channel('queue_daily')->info('队列异常', ['msg' => $e->getMessage() . '|' . $e->getLine()]);
  33. $this->delete();
  34. }
  35. }
  36. private function finalDo($msg){
  37. SyncTempRecord::where('id', $this->data['id'])
  38. ->update(['error_msg' => $msg, 'status' => SyncTempRecord::status_two]);
  39. }
  40. private function settle()
  41. {
  42. $id = $this->data['id'];
  43. $record = SyncTempRecord::where('id', $id)->first();
  44. // 1. 安全检查:如果记录不存在,或者已经是成功状态(status_one),则退出
  45. if (!$record || $record->status != SyncTempRecord::status_zero) {
  46. return [true, '已处理或记录不存在'];
  47. }
  48. // 2. 解析 U8 原始数据
  49. $u8Data = is_array($record->payload) ? $record->payload : json_decode($record->payload, true);
  50. $type = $record->type;
  51. if($type == SyncTempRecord::type_eight){
  52. //检验单
  53. list($status, $msg) = $this->bjOrder($record, $u8Data);
  54. }else{
  55. //同步到音飞
  56. list($status, $msg) = $this->orderInsert($record, $u8Data);
  57. }
  58. return [$status, $msg];
  59. }
  60. public function orderInsert($record, $u8Data){
  61. $orderType = SyncTempRecord::$map[$record->type];
  62. if(is_array($orderType)) $orderType = $orderType[$record->type_2];
  63. // 3. 组装报文
  64. $jsonBody = [
  65. 'orderType' => $orderType,
  66. 'orderNo' => $record->u8_no,
  67. 'orderId' => $record->u8_id,
  68. 'operationType' => SyncTempRecord::$opMapping[$record->op_type],
  69. 'lottar1' => '',
  70. 'lottar2' => '',
  71. 'lottar3' => '',
  72. 'lottar4' => '',
  73. 'lottar5' => '',
  74. 'lottar6' => '',
  75. 'lottar7' => '',
  76. 'lottar8' => '',
  77. 'lottar9' => '',
  78. 'details' => []
  79. ];
  80. // 4. 接口路由
  81. $inboundTypes = [SyncTempRecord::type_one, SyncTempRecord::type_three, SyncTempRecord::type_six, SyncTempRecord::type_five];
  82. $path = in_array($record->type, $inboundTypes) ? '/erp/inbound' : '/erp/outbound';
  83. if($record->type_2 == 1 && $record->type == SyncTempRecord::type_one) $path = '/erp/outbound';
  84. $apiUrl = config('wms.api_url') . $path;
  85. $bool = true;
  86. if (isset($u8Data['details']) && is_array($u8Data['details'])) {
  87. if($record->type == SyncTempRecord::type_one){
  88. // 采购到货|退货单
  89. $warehouse_code = array_unique(array_column($u8Data['details'],'warehouseCode'));
  90. $warehouse_code_array = ['01', '02', '05', '06' , '24', '25', '50', '51', '52', '53'];
  91. if(empty(array_intersect($warehouse_code, $warehouse_code_array))) $bool = false;
  92. }
  93. if($path == '/erp/inbound'){ //入
  94. foreach ($u8Data['details'] as $item) {
  95. $productDate = "";
  96. if(! empty($item['dMDate'])) $productDate = date("Y-m-d", strtotime($item['dMDate']));
  97. $validTime = 0;
  98. if(! empty($item['iMassDate'])) $validTime = $item['iMassDate'];
  99. $lot = "";
  100. if(! empty($item['cBatch'])) $lot = $item['cBatch'];
  101. $inspectionStatus = 0;
  102. //采购到货单 bGsp为是否质检(1-质检;0-不质检) 不质检直接合格
  103. if($record->type == SyncTempRecord::type_one && isset($item['bGsp']) && ! $item['bGsp']) $inspectionStatus = 1;
  104. $jsonBody['details'][] = [
  105. 'lineNum' => $item['lineNum'] ?? 0,
  106. 'materialCode' => $item['materialCode'] ?? '',
  107. 'warehouseCode' => $item['warehouseCode'] ?? '', //采购到货有仓库编码
  108. 'planQty' => (float)abs($item['planQty']),
  109. 'productDate' => $productDate,
  110. 'validTime' => $validTime,
  111. 'lot' => $lot,
  112. 'inspectionStatus' => $inspectionStatus,
  113. 'lottar1' => $item['cDefine23'] ?? "",
  114. 'lottar2' => "",
  115. 'lottar3' => '',
  116. 'lottar4' => '',
  117. 'lottar5' => '',
  118. 'lottar6' => '',
  119. 'lottar7' => '',
  120. 'lottar8' => '',
  121. 'lottar9' => '',
  122. ];
  123. }
  124. }else{//出
  125. foreach ($u8Data['details'] as $item) {
  126. $jsonBody['details'][] = [
  127. 'lineNum' => $item['lineNum'] ?? 0,
  128. 'materialCode' => $item['materialCode'] ?? '',
  129. 'planQty' => (float)abs($item['planQty']),
  130. 'lottar1' => "",
  131. 'lottar2' => $item['cBatch'] ?? "",
  132. 'lottar3' => '',
  133. 'lottar4' => '',
  134. 'lottar5' => '',
  135. 'lottar6' => '',
  136. 'lottar7' => '',
  137. 'lottar8' => '',
  138. 'lottar9' => '',
  139. ];
  140. }
  141. }
  142. }
  143. if($bool){
  144. //请求音飞
  145. list($status, $result) = $this->post_helper($apiUrl, $jsonBody, ['Content-Type: application/json']);
  146. if (!$status) {
  147. // 接口返回失败或网络失败
  148. $record->update(['status' => SyncTempRecord::status_two, 'error_msg' => "音飞API返回:" . $result]);
  149. return [false, $result];
  150. }
  151. }
  152. // 6. 接口返回成功 (200) 的后续操作
  153. DB::transaction(function () use ($record, $u8Data) {
  154. // 更新本地流水状态为成功
  155. $record->update([
  156. 'status' => SyncTempRecord::status_one,
  157. ]);
  158. // 同步维护快照表,保证下次 Command 比对正确
  159. if ($record->op_type == SyncTempRecord::opt_two) {
  160. DB::table('sync_snapshot')
  161. ->where('type', $record->type)
  162. ->where('u8_no', $record->u8_no)
  163. ->where('u8_id', $record->u8_id)
  164. ->delete();
  165. } else {
  166. DB::table('sync_snapshot')->updateOrInsert(
  167. ['type' => $record->type, 'u8_no' => $record->u8_no],
  168. ['u8_id' => $record->u8_id,'last_upd_time' => $record->u8_upd, 'order_date' => $u8Data['order_date'], 'payload' => json_encode($u8Data)]
  169. );
  170. }
  171. });
  172. return [true, ''];
  173. }
  174. //代码是有bug的 因为业务
  175. public function bjOrder($record, $u8Data){
  176. $hg_quantity = $u8Data['hg_quantity'] ?? 0;
  177. $rb_quantity = $u8Data['rb_quantity'] ?? 0;
  178. $hg_not_quantity = $u8Data['hg_not_quantity'] ?? 0;
  179. if($hg_quantity > 0){
  180. list($status, $msg) = $this->zj($record, $u8Data, 1);
  181. }elseif ($rb_quantity > 0) {
  182. list($status, $msg) = $this->zj($record, $u8Data,3);
  183. }elseif($hg_not_quantity > 0){
  184. list($status, $msg) = $this->zj($record, $u8Data,2);
  185. }
  186. return [$status ?? true, $msg ?? ""];
  187. }
  188. public function zj($record, $u8Data, $type){
  189. if($record['type_2'] == 1){
  190. $orderNo = $u8Data['from_order'];
  191. }else{
  192. $orderNo = $u8Data['bj_order'];
  193. }
  194. // 3. 组装报文
  195. $jsonBody = [
  196. 'orderNo' => $orderNo,
  197. 'materialCode' => $u8Data['materialCode'],
  198. 'lot' => $u8Data['lot'] ?? "",
  199. 'inspectionStatus' => $type,
  200. ];
  201. // 4. 接口路由
  202. $apiUrl = config('wms.api_url') . '/erp/inspection';
  203. // 5. 调用 post_helper
  204. list($status, $result) = $this->post_helper($apiUrl, $jsonBody, ['Content-Type: application/json']);
  205. if (!$status) {
  206. // 接口返回失败或网络失败
  207. $record->update(['status' => SyncTempRecord::status_two, 'error_msg' => "音飞API返回:" . $result]);
  208. return [false, $result];
  209. }
  210. //生成用友单据
  211. $service = new U8ThirdPartyService();
  212. //报检单类型
  213. $bg_type = $record['type_2'];
  214. if($bg_type == 1){
  215. //采购
  216. if($type == 1 || $type == 3){
  217. //合格 让步 =》 采购入库
  218. list($status, $msg) = $service->purchaseInByZj(['record' => $record, 'payload' => $u8Data,'cvouchtype' => 'QM03','type' => $type]);
  219. }else{
  220. //不合格 =》其他入库单
  221. list($status, $msg) = $service->otherInByZj(['record' => $record, 'payload' => $u8Data, 'cvouchtype' => 'QM03']);
  222. }
  223. }elseif ($bg_type == 2){
  224. //产品
  225. if($type == 1 || $type == 3){
  226. //合格 让步 =》 产成品入库
  227. list($status, $msg) = $service->productInByZj(['record' => $record, 'payload' => $u8Data, 'cvouchtype' => 'QM04','type' => $type]);
  228. }else{
  229. //不合格 =》其他入库单
  230. list($status, $msg) = $service->otherInByZj(['record' => $record, 'payload' => $u8Data, 'cvouchtype' => 'QM04']);
  231. }
  232. }else{
  233. //退货 不需要了
  234. list($status, $msg) = [true, ''];
  235. }
  236. if (! $status) {
  237. // 接口返回失败或网络失败
  238. $record->update(['status' => SyncTempRecord::status_two, 'error_msg' => $msg]);
  239. return [false, $msg];
  240. }
  241. // 6. 接口返回成功 (200) 的后续操作
  242. DB::transaction(function () use ($record, $u8Data) {
  243. // 更新本地流水状态为成功
  244. $record->update([
  245. 'status' => SyncTempRecord::status_one,
  246. ]);
  247. // 同步维护快照表,保证下次 Command 比对正确
  248. if ($record->op_type == SyncTempRecord::opt_two) {
  249. DB::table('sync_snapshot')
  250. ->where('type', $record->type)
  251. ->where('u8_no', $record->u8_no)
  252. ->where('u8_id', $record->u8_id)
  253. ->delete();
  254. } else {
  255. DB::table('sync_snapshot')->updateOrInsert(
  256. ['type' => $record->type, 'u8_no' => $record->u8_no],
  257. ['u8_id' => $record->u8_id,'last_upd_time' => $record->u8_upd, 'order_date' => $u8Data['order_date'], 'payload' => json_encode($u8Data)]
  258. );
  259. }
  260. });
  261. return [true, ''];
  262. }
  263. public function post_helper($url, $data, $header = [], $timeout = 30)
  264. {
  265. Log::channel('apiLog')->info('入参', ["api" => $url, "param" => $data]);
  266. $ch = curl_init();
  267. curl_setopt($ch, CURLOPT_URL, $url);
  268. curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
  269. curl_setopt($ch, CURLOPT_POST, 1);
  270. curl_setopt($ch, CURLOPT_HTTPHEADER, $header);
  271. curl_setopt($ch, CURLOPT_SSL_VERIFYPEER, false);
  272. curl_setopt($ch, CURLOPT_TIMEOUT, $timeout);
  273. if(!is_null($data)) curl_setopt($ch, CURLOPT_POSTFIELDS, json_encode($data));
  274. $r = curl_exec($ch);
  275. if ($r === false) {
  276. $errorMessage = curl_error($ch);
  277. curl_close($ch);
  278. Log::channel('apiLog')->error('CURL错误', ["msg" => $errorMessage]);
  279. return [false, "网络错误: " . $errorMessage];
  280. }
  281. curl_close($ch);
  282. $return = json_decode($r, true);
  283. Log::channel('apiLog')->info('出参', ["res" => $return]);
  284. // 判断逻辑:code 存在且为 200 才算 true
  285. if (isset($return['code']) && $return['code'] == 200) {
  286. return [true, $return];
  287. }
  288. $msg = $return['message'] ?? '未知接口错误';
  289. return [false, $msg];
  290. }
  291. protected function echoMessage(OutputInterface $output)
  292. {
  293. //输出消息
  294. $output->writeln(json_encode($this->data));
  295. }
  296. }