| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254 |
- <?php
- namespace App\Jobs;
- use App\Model\SyncTempRecord;
- use Illuminate\Bus\Queueable;
- use Illuminate\Contracts\Queue\ShouldQueue;
- use Illuminate\Foundation\Bus\Dispatchable;
- use Illuminate\Queue\InteractsWithQueue;
- use Illuminate\Queue\SerializesModels;
- use Illuminate\Support\Facades\DB;
- use Illuminate\Support\Facades\Log;
- use Symfony\Component\Console\Output\ConsoleOutput;
- use Symfony\Component\Console\Output\OutputInterface;
- class ProcessWMSDataJob implements ShouldQueue
- {
- use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
- protected $data;
- public $timeout = 30;
- public function __construct($data)
- {
- //record表
- $this->data = $data;
- }
- public function handle()
- {
- try {
- list($bool, $msg) = $this->settle();
- if(! $bool) $this->finalDo($msg);
- } catch (\Throwable $e) {
- $this->finalDo("异常:" . $e->getMessage());
- $this->delete();
- }
- }
- private function finalDo($msg){
- SyncTempRecord::where('id', $this->data['id'])
- ->update(['error_msg' => $msg, 'status' => SyncTempRecord::status_one]);
- }
- private function settle()
- {
- $id = $this->data['id'];
- $record = SyncTempRecord::where('id', $id)->first();
- // 1. 安全检查:如果记录不存在,或者已经是成功状态(status_one),则退出
- if (!$record || $record->status != SyncTempRecord::status_zero) {
- return [true, '已处理或记录不存在'];
- }
- // 2. 解析 U8 原始数据
- $u8Data = is_array($record->payload) ? $record->payload : json_decode($record->payload, true);
- $type = $record->type;
- if($type == SyncTempRecord::type_eight){
- //检验单
- list($status, $msg) = $this->bjOrder($record, $u8Data);
- }else{
- //同步到音飞
- list($status, $msg) = $this->orderInsert($record, $u8Data);
- }
- return [$status, $msg];
- }
- public function orderInsert($record, $u8Data){
- $orderType = SyncTempRecord::$map[$record->type];
- if(is_array($orderType)) $orderType = $orderType[$record->type_2];
- // 3. 组装报文
- $jsonBody = [
- 'orderType' => $orderType,
- 'orderNo' => $record->u8_no,
- 'orderId' => $record->u8_id,
- 'operationType' => SyncTempRecord::$opMapping[$record->op_type],
- 'lottar1' => '',
- 'lottar2' => '',
- 'lottar3' => '',
- 'lottar4' => '',
- 'lottar5' => '',
- 'lottar6' => '',
- 'lottar7' => '',
- 'lottar8' => '',
- 'lottar9' => '',
- 'details' => []
- ];
- if (isset($u8Data['details']) && is_array($u8Data['details'])) {
- foreach ($u8Data['details'] as $item) {
- $jsonBody['details'][] = [
- 'lineNum' => $item['lineNum'] ?? 0,
- 'materialCode' => $item['materialCode'] ?? '',
- 'planQty' => (float)($item['planQty'] ?? 0),
- 'lottar1' => $item['lottar1'] ?? "",
- 'lottar2' => '',
- 'lottar3' => '',
- 'lottar4' => '',
- 'lottar5' => '',
- 'lottar6' => '',
- 'lottar7' => '',
- 'lottar8' => '',
- 'lottar9' => '',
- ];
- }
- }
- // 4. 接口路由
- $inboundTypes = [SyncTempRecord::type_one, SyncTempRecord::type_three];
- $path = in_array($record->type, $inboundTypes) ? '/erp/inbound' : '/erp/outbound';
- $apiUrl = config('wms.api_url') . $path;
- // 5. 调用 post_helper
- list($status, $result) = $this->post_helper($apiUrl, $jsonBody, ['Content-Type: application/json']);
- if (!$status) {
- // 接口返回失败或网络失败
- $record->update(['status' => SyncTempRecord::status_two, 'error_msg' => $result]);
- return [false, $result];
- }
- // 6. 接口返回成功 (200) 的后续操作
- DB::transaction(function () use ($record, $u8Data) {
- // 更新本地流水状态为成功
- $record->update([
- 'status' => SyncTempRecord::status_one,
- ]);
- // 同步维护快照表,保证下次 Command 比对正确
- if ($record->op_type == SyncTempRecord::opt_two) {
- DB::table('sync_snapshot')
- ->where('type', $record->type)
- ->where('u8_no', $record->u8_no)
- ->where('u8_id', $record->u8_id)
- ->delete();
- } else {
- DB::table('sync_snapshot')->updateOrInsert(
- ['type' => $record->type, 'u8_no' => $record->u8_no],
- ['u8_id' => $record->u8_id,'last_upd_time' => $record->u8_upd, 'order_date' => $u8Data['order_date'], 'payload' => json_encode($u8Data)]
- );
- }
- });
- return [true, ''];
- }
- //代码是有bug的 因为业务
- public function bjOrder($record, $u8Data){
- $hg_quantity = $u8Data['hg_quantity'] ?? 0;
- $rb_quantity = $u8Data['rb_quantity'] ?? 0;
- $hg_not_quantity = $u8Data['hg_not_quantity'] ?? 0;
- if($hg_quantity > 0){
- list($status, $msg) = $this->zj($record, $u8Data, 1);
- }elseif ($rb_quantity > 0) {
- list($status, $msg) = $this->zj($record, $u8Data,3);
- }elseif($hg_not_quantity > 0){
- list($status, $msg) = $this->zj($record, $u8Data,2);
- }
- return [$status ?? true, $msg ?? ""];
- }
- //todo
- public function zj($record, $u8Data, $type){
- // 3. 组装报文
- $jsonBody = [
- 'orderNo' => $record->u8_no,
- 'materialCode' => $u8Data['materialCode'],
- 'lot' => $u8Data['lot'] ?? "",
- 'inspectionStatus' => $type,
- ];
- // 4. 接口路由
- $apiUrl = config('wms.api_url') . '/erp/inspection';
- // 5. 调用 post_helper
- list($status, $result) = $this->post_helper($apiUrl, $jsonBody, ['Content-Type: application/json']);
- if (!$status) {
- // 接口返回失败或网络失败
- $record->update(['status' => SyncTempRecord::status_two, 'error_msg' => $result]);
- return [false, $result];
- }
- // 6. 接口返回成功 (200) 的后续操作
- DB::transaction(function () use ($record, $u8Data) {
- // 更新本地流水状态为成功
- $record->update([
- 'status' => SyncTempRecord::status_one,
- ]);
- // 同步维护快照表,保证下次 Command 比对正确
- if ($record->op_type == SyncTempRecord::opt_two) {
- DB::table('sync_snapshot')
- ->where('type', $record->type)
- ->where('u8_no', $record->u8_no)
- ->where('u8_id', $record->u8_id)
- ->delete();
- } else {
- DB::table('sync_snapshot')->updateOrInsert(
- ['type' => $record->type, 'u8_no' => $record->u8_no],
- ['u8_id' => $record->u8_id,'last_upd_time' => $record->u8_upd, 'order_date' => $u8Data['order_date'], 'payload' => json_encode($u8Data)]
- );
- }
- });
- return [true, ''];
- }
- public function post_helper($url, $data, $header = [], $timeout = 30)
- {
- Log::channel('apiLog')->info('WMS_POST_START', ["api" => $url, "param" => $data]);
- $ch = curl_init();
- curl_setopt($ch, CURLOPT_URL, $url);
- curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
- curl_setopt($ch, CURLOPT_POST, 1);
- curl_setopt($ch, CURLOPT_HTTPHEADER, $header);
- curl_setopt($ch, CURLOPT_SSL_VERIFYPEER, false);
- curl_setopt($ch, CURLOPT_TIMEOUT, $timeout);
- if(!is_null($data)) curl_setopt($ch, CURLOPT_POSTFIELDS, json_encode($data));
- $r = curl_exec($ch);
- if ($r === false) {
- $errorMessage = curl_error($ch);
- curl_close($ch);
- Log::channel('apiLog')->error('WMS_CURL_ERROR', ["msg" => $errorMessage]);
- return [false, "网络错误: " . $errorMessage];
- }
- curl_close($ch);
- $return = json_decode($r, true);
- Log::channel('apiLog')->info('WMS_POST_RETURN', ["res" => $return]);
- // 判断逻辑:code 存在且为 200 才算 true
- if (isset($return['code']) && $return['code'] == 200) {
- return [true, $return];
- }
- $msg = $return['message'] ?? '未知接口错误';
- return [false, $msg];
- }
- protected function echoMessage(OutputInterface $output)
- {
- //输出消息
- $output->writeln(json_encode($this->data));
- }
- }
|