settle($time); echo '第一步完成------' . PHP_EOL; //处理存货数据 $this->productInsert($time); echo '第二步完成------' . PHP_EOL; }catch (\Exception $exception){ Log::channel('apiLog')->info('存货异常', ['msg' => $exception->getMessage()]); } } public function settle($time){ $service = new U8ThirtyPartyDatabaseServerService(); $tasks = [ 'product' => [ 'whereRaw' => "", 'limit' => 100, ] ]; foreach ($tasks as $name => $config) { $currentU8Nos = []; $lastCode = ""; // 分页依据:存货编码 while (true) { list($status, $items) = $service->getInventoryData($config, $lastCode); if (!$status || empty($items)) break; $nos = collect($items)->pluck('product_code')->toArray(); $currentU8Nos = array_merge($currentU8Nos, $nos); // 获取本地快照(注意:快照表的 ufts 字段需要是字符串类型) $snapshots = DB::table('sync_snapshot_product') ->whereIn('code', $nos) ->get() ->keyBy('code'); foreach ($items as $item) { $no = $item['product_code']; $u8Ufts = $item['ufts_str']; // 拿到的是 '0x000000000005AD12' 这种 $snapshot = $snapshots->get($no); $opType = null; if (!$snapshot) { $opType = SyncTempRecordProduct::opt_zero; } elseif (strtolower($u8Ufts) !== strtolower($snapshot->ufts)) { $opType = SyncTempRecordProduct::opt_one; } // if (!$snapshot) { // // 本地无记录 -> 新增 // $opType = SyncTempRecordProduct::opt_zero; // } elseif ($u8Ufts !== $snapshot->ufts) { // // 本地 ufts 字符串与 U8 不一致 -> 修改 // $opType = SyncTempRecordProduct::opt_one; // } if ($opType !== null) { $this->createSyncTask($no, $item, $opType, $u8Ufts, $time); } } // 分页标识:取本批次最后一个编码 $lastCode = end($items)['product_code']; } // --- 第二步:处理删除 --- DB::table('sync_snapshot_product')->orderBy('code')->chunk(1000, function ($snapshots) use ($currentU8Nos, $time) { $localNos = $snapshots->pluck('code')->toArray(); $deletedNos = array_diff($localNos, $currentU8Nos); if (! empty($deletedNos)) { foreach ($deletedNos as $no) { $this->createSyncTask($no, ['product_code' => $no], SyncTempRecordProduct::opt_two, "", $time); } } }); } } public function productInsert($time) { DB::table('sync_temp_records_product') ->where('crt_time', $time) ->where('status', SyncTempRecordProduct::status_zero) // 只处理待处理的数据 ->orderBy('id') ->chunkById(100, function ($records) { $batchData = []; $record_ids = []; $processedItems = []; // 用于批量维护快照 foreach ($records as $record) { $u8Data = is_array($record->payload) ? $record->payload : json_decode($record->payload, true); if (empty($u8Data)) continue; // 组织接口数据 $batchData[] = [ 'operationType' => SyncTempRecordProduct::$opMapping[$record->op_type], 'materialCode' => $u8Data['product_code'] ?? '', 'materialName' => $u8Data['product_name'] ?? '', 'materialSpec' => $u8Data['product_size'] ?? '', 'materialTypeCode' => $u8Data['product_category_code'] ?? '', 'materialTypeName' => $u8Data['product_category_name'] ?? '', 'unitCode' => $u8Data['product_unit_title'] ?? '', 'netWeight' => 0, 'grossWeight' => $u8Data['grossWeight'] ?? 0, ]; $record_ids[] = $record->id; // 缓存这一行的数据,用于后续成功后批量写回快照 $processedItems[] = [ 'code' => $record->code, 'ufts' => $record->ufts, 'op_type' => $record->op_type, 'payload' => $record->payload ]; } if (empty($batchData)) return; // 5. 发送数据 list($status, $msg) = $this->sendToTargetSystem($batchData); if (!$status) { // 批量失败记录错误 SyncTempRecordProduct::whereIn('id', $record_ids) ->update(['status' => SyncTempRecordProduct::status_two, 'error_msg' => $msg]); } else { // 批量成功更新状态 SyncTempRecordProduct::whereIn('id', $record_ids) ->update(['status' => SyncTempRecordProduct::status_one]); // --- 批量维护快照表 --- foreach ($processedItems as $item) { if ($item['op_type'] == SyncTempRecordProduct::opt_two) { // 删除操作:从快照移除 DB::table('sync_snapshot_product') ->where('code', $item['code']) ->delete(); } else { // 新增或修改:更新/插入快照 DB::table('sync_snapshot_product')->updateOrInsert( ['code' => $item['code']], [ 'ufts' => $item['ufts'], 'payload' => $item['payload'], 'code' => $item['code'] ] ); } } } }); } public function sendToTargetSystem($jsonBody){ // 4. 接口路由 $path = '/erp/material'; $apiUrl = config('wms.api_url') . $path; // 5. 调用 list($status, $result) = $this->post_helper($apiUrl, $jsonBody, ['Content-Type: application/json']); return [$status, $result]; } private function createSyncTask($no, $payload, $opType, $ufts = "", $time) { SyncTempRecordProduct::create([ 'code' => $no, 'payload' => json_encode($payload), 'ufts' => $ufts, 'op_type' => $opType, 'crt_time'=> $time, ]); } public function post_helper($url, $data, $header = [], $timeout = 30) { Log::channel('apiLog')->info('存货同步', ["api" => $url, "param" => $data]); $ch = curl_init(); curl_setopt($ch, CURLOPT_URL, $url); curl_setopt($ch, CURLOPT_RETURNTRANSFER, true); curl_setopt($ch, CURLOPT_POST, 1); curl_setopt($ch, CURLOPT_HTTPHEADER, $header); curl_setopt($ch, CURLOPT_SSL_VERIFYPEER, false); curl_setopt($ch, CURLOPT_TIMEOUT, $timeout); if(!is_null($data)) curl_setopt($ch, CURLOPT_POSTFIELDS, json_encode($data)); $r = curl_exec($ch); if ($r === false) { $errorMessage = curl_error($ch); curl_close($ch); Log::channel('apiLog')->error('存货同步异常:WMS_CURL_ERROR', ["msg" => $errorMessage]); return [false, "网络错误: " . $errorMessage]; } curl_close($ch); $return = json_decode($r, true); Log::channel('apiLog')->info('存货同步返回结果', ["res" => $return]); // 判断逻辑:code 存在且为 200 才算 true if (isset($return['code']) && $return['code'] == 200) { return [true, $return]; } $msg = $return['message'] ?? '未知接口错误'; return [false, $msg]; } }