cqp hai 1 mes
pai
achega
9e575cadc2
Modificáronse 2 ficheiros con 60 adicións e 35 borrados
  1. 56 35
      app/Jobs/ProcessOssTask.php
  2. 4 0
      app/Model/SysOssTasks.php

+ 56 - 35
app/Jobs/ProcessOssTask.php

@@ -9,75 +9,96 @@ use Illuminate\Contracts\Queue\ShouldQueue;
 use Illuminate\Foundation\Bus\Dispatchable;
 use Illuminate\Queue\InteractsWithQueue;
 use Illuminate\Queue\SerializesModels;
-use Symfony\Component\Console\Output\ConsoleOutput;
 use Symfony\Component\Console\Output\OutputInterface;
 
 class ProcessOssTask implements ShouldQueue
 {
     use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
 
-    public $timeout = 50;
+    /**
+     * 任务超时时间(秒)
+     * OSS 上传受网络影响较大,设置 120s 较为稳妥
+     */
+    public $timeout = 120;
 
     public function __construct()
     {
-
     }
 
     public function handle()
     {
-        $currentBatchIds = [];
+        $service = new FileUploadService();
 
         try {
-            $service = new FileUploadService();
-            SysOssTasks::where('status', 0)
-                ->chunk(50, function ($tasks) use (&$currentBatchIds, $service) {
-                    // 每次进入新的一批,先记录这一批的所有 ID
-                    $currentBatchIds = $tasks->pluck('id')->toArray();
+            // 1. 提取本次 Job 要处理的 ID 集合 (每次拿 50 条)
+            // 这样可以确保这个 Job 实例拥有这批 ID 的“专属权”
+            $targetIds = SysOssTasks::where('status', 0)
+                ->limit(50)
+                ->pluck('id')
+                ->toArray();
+
+            if (empty($targetIds)) {
+                return; // 没有待处理任务直接退出
+            }
+
+            // 2. 批量锁定:将这批 ID 状态改为 3
+            SysOssTasks::whereIn('id', $targetIds)->update([
+                'status' => SysOssTasks::status_three,
+                'upd_time' => time(),
+            ]);
 
+            // 3. 仅针对这批锁定的 ID 进行处理
+            // 使用 chunkById 预防大数据量下的分页偏移问题
+            SysOssTasks::whereIn('id', $targetIds)
+                ->chunkById(50, function ($tasks) use ($service) {
                     foreach ($tasks as $task) {
-                        $status_db = 2;
-                        if($task['type'] == SysOssTasks::type_one){
-                            //删除
-                            list($status, $msg) = $service->createOssUploadOldSingle($task->url);
-                            if($status) $status_db = 1;
+                        $status_db = SysOssTasks::status_two; // 默认失败
 
-                        }elseif ($task['type'] == SysOssTasks::type_two){
-                            //新增
-                            list($status, $msg) = $service->createOssUploadSingle($task->url);
-                            if($status) $status_db = 1;
-                        }else{
-                            $msg = "单条失败:类型不存在";
+                        try {
+                            if ($task->type == SysOssTasks::type_one) {
+                                // 类型1:删除/同步旧数据逻辑
+                                list($status, $msg) = $service->createOssUploadOldSingle($task->url);
+                                if ($status) $status_db = SysOssTasks::status_one;
+                            } elseif ($task->type == SysOssTasks::type_two) {
+                                // 类型2:新增/上传逻辑
+                                list($status, $msg) = $service->createOssUploadSingle($task->url);
+                                if ($status) $status_db = SysOssTasks::status_one;
+                            } else {
+                                $msg = "单条失败:类型不存在";
+                            }
+                        } catch (\Exception $innerEx) {
+                            $status_db = SysOssTasks::status_two;
+                            $msg = "上传组件异常: " . $innerEx->getMessage();
                         }
 
+                        // 4. 更新最终状态
                         $task->update([
                             'status' => $status_db,
-                            'remark' => $msg,
+                            'remark' => $msg ?? '',
                             'upd_time' => time()
                         ]);
                     }
-
-                    // 处理完这一批后,清空当前批次 ID 记录
-                    $currentBatchIds = [];
                 });
+
         } catch (\Throwable $e) {
-            $errorMessage = "当前批次执行中断:" . $e->getMessage();
-            if (!empty($currentBatchIds)) {
-                SysOssTasks::whereIn('id', $currentBatchIds)
-                    ->where('status', 0)
+            if (!empty($targetIds)) {
+                SysOssTasks::whereIn('id', $targetIds)
+                    ->where('status', SysOssTasks::status_three)
                     ->update([
-                        'status' => 2,
-                        'remark' => $errorMessage,
+                        'status' => SysOssTasks::status_two,
+                        'remark' => $e->getMessage(),
                         'upd_time' => time(),
                     ]);
             }
-
             $this->delete();
         }
     }
 
-    protected function echoMessage(OutputInterface $output)
+    /**
+     * 辅助输出方法
+     */
+    protected function echoMessage(OutputInterface $output, $data)
     {
-        //输出消息
-        $output->writeln(json_encode($this->data));
+        $output->writeln(json_encode($data));
     }
-}
+}

+ 4 - 0
app/Model/SysOssTasks.php

@@ -15,4 +15,8 @@ class SysOssTasks extends Model
     const type_one = 1; //删除
     const type_two = 2; //新增
     const job = 'sync_oss_file';
+
+    const status_one = 1; // 成功
+    const status_two = 2; // 失败
+    const status_three = 3; // 待处理 锁定
 }