ProcessOssTask.php 2.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283
  1. <?php
  2. namespace App\Jobs;
  3. use App\Model\SysOssTasks;
  4. use App\Service\FileUploadService;
  5. use Illuminate\Bus\Queueable;
  6. use Illuminate\Contracts\Queue\ShouldQueue;
  7. use Illuminate\Foundation\Bus\Dispatchable;
  8. use Illuminate\Queue\InteractsWithQueue;
  9. use Illuminate\Queue\SerializesModels;
  10. use Symfony\Component\Console\Output\ConsoleOutput;
  11. use Symfony\Component\Console\Output\OutputInterface;
  12. class ProcessOssTask implements ShouldQueue
  13. {
  14. use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
  15. public $timeout = 300;
  16. public function __construct()
  17. {
  18. }
  19. public function handle()
  20. {
  21. $currentBatchIds = [];
  22. try {
  23. $service = new FileUploadService();
  24. SysOssTasks::where('status', 0)
  25. ->chunk(50, function ($tasks) use (&$currentBatchIds, $service) {
  26. // 每次进入新的一批,先记录这一批的所有 ID
  27. $currentBatchIds = $tasks->pluck('id')->toArray();
  28. foreach ($tasks as $task) {
  29. $status_db = 2;
  30. if($task['type'] == 1){
  31. //删除
  32. list($status, $msg) = $service->createOssUploadOldSingle($task->url);
  33. if($status) $status_db = 1;
  34. }elseif ($task['type'] == 2){
  35. //新增
  36. list($status, $msg) = $service->createOssUploadSingle($task->url);
  37. if($status) $status_db = 1;
  38. }else{
  39. $msg = "单条失败:类型不存在";
  40. }
  41. $task->update([
  42. 'status' => $status_db,
  43. 'remark' => $msg,
  44. 'upd_time' => time()
  45. ]);
  46. }
  47. // 处理完这一批后,清空当前批次 ID 记录
  48. $currentBatchIds = [];
  49. });
  50. } catch (\Throwable $e) {
  51. $errorMessage = "当前批次执行中断:" . $e->getMessage();
  52. if (!empty($currentBatchIds)) {
  53. SysOssTasks::whereIn('id', $currentBatchIds)
  54. ->where('status', 0)
  55. ->update([
  56. 'status' => 2,
  57. 'remark' => $errorMessage,
  58. 'upd_time' => time(),
  59. ]);
  60. }
  61. $this->delete();
  62. }
  63. }
  64. protected function echoMessage(OutputInterface $output)
  65. {
  66. //输出消息
  67. $output->writeln(json_encode($this->data));
  68. }
  69. }