佇列
簡介
在建構您的 Web 應用程式時,您可能會有一些任務,例如解析和儲存上傳的 CSV 檔案,這些任務在典型的 Web 請求期間執行時間過長。 值得慶幸的是,Laravel 讓您可以輕鬆建立佇列任務,這些任務可以在背景中處理。 通過將耗時的任務移動到佇列,您的應用程式可以閃電般的速度回應 Web 請求,並為您的客戶提供更好的使用者體驗。
Laravel 佇列在各種不同的佇列後端(例如 Amazon SQS、Redis 甚至關聯式資料庫)上提供統一的佇列 API。
Laravel 的佇列設定選項儲存在您應用程式的 config/queue.php
設定檔中。 在此檔案中,您將找到框架隨附的每個佇列驅動程式的連線設定,包括 database、Amazon SQS、Redis 和 Beanstalkd 驅動程式,以及將立即執行任務的同步驅動程式(用於本地開發期間)。 還包含一個 null
佇列驅動程式,它會捨棄佇列任務。
Laravel 現在提供 Horizon,這是一個適用於您的 Redis 驅動佇列的美觀儀表板和設定系統。 請查看完整的 Horizon 文件以獲取更多資訊。
連線 vs. 佇列
在開始使用 Laravel 佇列之前,務必了解「連線」和「佇列」之間的區別。 在您的 config/queue.php
設定檔中,有一個 connections
設定陣列。 此選項定義與後端佇列服務(例如 Amazon SQS、Beanstalk 或 Redis)的連線。 但是,任何給定的佇列連線都可能有多個「佇列」,這些佇列可以被認為是不同的佇列任務堆疊或堆。
請注意,queue
設定檔中的每個連線設定範例都包含一個 queue
屬性。 這是當任務被傳送到給定連線時,任務將被調度到的預設佇列。 換句話說,如果您調度任務時未明確定義應將其調度到哪個佇列,則任務將被放置在連線設定的 queue
屬性中定義的佇列上。
1use App\Jobs\ProcessPodcast;2 3// This job is sent to the default connection's default queue...4ProcessPodcast::dispatch();5 6// This job is sent to the default connection's "emails" queue...7ProcessPodcast::dispatch()->onQueue('emails');
某些應用程式可能永遠不需要將任務推送到多個佇列,而是傾向於使用一個簡單的佇列。 但是,將任務推送到多個佇列對於希望優先處理或區隔任務處理方式的應用程式特別有用,因為 Laravel 佇列工作程序允許您按優先順序指定它應處理哪些佇列。 例如,如果您將任務推送到 high
佇列,您可以執行一個工作程序,使其具有更高的處理優先順序。
1php artisan queue:work --queue=high,default
驅動程式注意事項與先決條件
資料庫
為了使用 database
佇列驅動程式,您需要一個資料庫表來保存任務。 通常,這包含在 Laravel 的預設 0001_01_01_000002_create_jobs_table.php
資料庫遷移中;但是,如果您的應用程式不包含此遷移,您可以使用 make:queue-table
Artisan 命令來建立它。
1php artisan make:queue-table2 3php artisan migrate
Redis
為了使用 redis
佇列驅動程式,您應該在您的 config/database.php
設定檔中設定 Redis 資料庫連線。
redis
佇列驅動程式不支援 serializer
和 compression
Redis 選項。
Redis 集群
如果您的 Redis 佇列連線使用 Redis 集群,則您的佇列名稱必須包含 金鑰雜湊標籤。 這是必需的,以確保給定佇列的所有 Redis 金鑰都放置在同一個雜湊槽中。
1'redis' => [2 'driver' => 'redis',3 'connection' => env('REDIS_QUEUE_CONNECTION', 'default'),4 'queue' => env('REDIS_QUEUE', '{default}'),5 'retry_after' => env('REDIS_QUEUE_RETRY_AFTER', 90),6 'block_for' => null,7 'after_commit' => false,8],
封鎖
當使用 Redis 佇列時,您可以使用 block_for
設定選項來指定驅動程式應等待任務可用的時間長度,然後再迭代工作程序迴圈並重新輪詢 Redis 資料庫。
根據您的佇列負載調整此值可能比持續輪詢 Redis 資料庫以尋找新任務更有效率。 例如,您可以將值設定為 5
,以指示驅動程式應封鎖五秒鐘,同時等待任務可用。
1'redis' => [2 'driver' => 'redis',3 'connection' => env('REDIS_QUEUE_CONNECTION', 'default'),4 'queue' => env('REDIS_QUEUE', 'default'),5 'retry_after' => env('REDIS_QUEUE_RETRY_AFTER', 90),6 'block_for' => 5,7 'after_commit' => false,8],
將 block_for
設定為 0
將導致佇列工作程序無限期封鎖,直到任務可用為止。 這也將阻止處理諸如 SIGTERM
之類的訊號,直到下一個任務被處理完畢。
其他驅動程式先決條件
列出的佇列驅動程式需要以下依賴項。 這些依賴項可以通過 Composer 套件管理器安裝。
- Amazon SQS:
aws/aws-sdk-php ~3.0
- Beanstalkd:
pda/pheanstalk ~5.0
- Redis:
predis/predis ~2.0
或 phpredis PHP 擴展 - MongoDB:
mongodb/laravel-mongodb
建立任務
生成任務類別
預設情況下,您應用程式的所有可佇列任務都儲存在 app/Jobs
目錄中。 如果 app/Jobs
目錄不存在,則在您執行 make:job
Artisan 命令時將會建立它。
1php artisan make:job ProcessPodcast
生成的類別將實現 Illuminate\Contracts\Queue\ShouldQueue
介面,向 Laravel 指示任務應推送到佇列以非同步執行。
任務存根可以使用 存根發佈進行自訂。
類別結構
任務類別非常簡單,通常只包含一個 handle
方法,該方法在任務由佇列處理時調用。 為了開始使用,讓我們看一下一個範例任務類別。 在此範例中,我們將假設我們管理一個播客發佈服務,並且需要在發佈之前處理上傳的播客檔案。
1<?php 2 3namespace App\Jobs; 4 5use App\Models\Podcast; 6use App\Services\AudioProcessor; 7use Illuminate\Contracts\Queue\ShouldQueue; 8use Illuminate\Foundation\Queue\Queueable; 9 10class ProcessPodcast implements ShouldQueue11{12 use Queueable;13 14 /**15 * Create a new job instance.16 */17 public function __construct(18 public Podcast $podcast,19 ) {}20 21 /**22 * Execute the job.23 */24 public function handle(AudioProcessor $processor): void25 {26 // Process uploaded podcast...27 }28}
在此範例中,請注意,我們能夠將 Eloquent 模型直接傳遞到佇列任務的建構函式中。 由於任務正在使用的 Queueable
trait,因此當任務正在處理時,Eloquent 模型及其載入的關聯將被優雅地序列化和反序列化。
如果您的佇列任務在其建構函式中接受 Eloquent 模型,則只有模型的標識符將被序列化到佇列上。 當任務實際處理時,佇列系統將自動從資料庫中重新檢索完整的模型實例及其載入的關聯。 這種模型序列化方法允許將更小的任務有效負載傳送到您的佇列驅動程式。
handle
方法依賴注入
當任務由佇列處理時,將調用 handle
方法。 請注意,我們能夠在任務的 handle
方法上類型提示依賴項。 Laravel 服務容器會自動注入這些依賴項。
如果您想完全控制容器如何將依賴項注入到 handle
方法中,您可以使用容器的 bindMethod
方法。 bindMethod
方法接受一個回調,該回調接收任務和容器。 在回調中,您可以隨意調用 handle
方法。 通常,您應該從 App\Providers\AppServiceProvider
服務提供者的 boot
方法中調用此方法。
1use App\Jobs\ProcessPodcast;2use App\Services\AudioProcessor;3use Illuminate\Contracts\Foundation\Application;4 5$this->app->bindMethod([ProcessPodcast::class, 'handle'], function (ProcessPodcast $job, Application $app) {6 return $job->handle($app->make(AudioProcessor::class));7});
二進制資料(例如原始影像內容)應在傳遞到佇列任務之前通過 base64_encode
函式傳遞。 否則,當放置在佇列上時,任務可能無法正確序列化為 JSON。
佇列關聯
由於所有載入的 Eloquent 模型關聯也會在任務排隊時序列化,因此序列化的任務字串有時可能會變得非常大。 此外,當任務反序列化並且模型關聯從資料庫重新檢索時,它們將被完整地檢索。 在任務排隊過程中模型序列化之前應用的任何先前的關聯約束將在任務反序列化時不會應用。 因此,如果您希望使用給定關聯的子集,則應在您的佇列任務中重新約束該關聯。
或者,為了防止關聯被序列化,您可以在設定屬性值時調用模型上的 withoutRelations
方法。 此方法將傳回沒有其載入的關聯的模型實例。
1/**2 * Create a new job instance.3 */4public function __construct(5 Podcast $podcast,6) {7 $this->podcast = $podcast->withoutRelations();8}
如果您正在使用 PHP 建構函式屬性提升,並且想要指示不應序列化 Eloquent 模型的關聯,則可以使用 WithoutRelations
屬性。
1use Illuminate\Queue\Attributes\WithoutRelations;2 3/**4 * Create a new job instance.5 */6public function __construct(7 #[WithoutRelations]8 public Podcast $podcast,9) {}
如果任務收到 Eloquent 模型集合或陣列而不是單個模型,則當任務反序列化並執行時,該集合中的模型將不會恢復其關聯。 這是為了防止在處理大量模型的任務上過度使用資源。
獨特任務
獨特任務需要支援 鎖定的快取驅動程式。 目前,memcached
、redis
、dynamodb
、database
、file
和 array
快取驅動程式支援原子鎖定。 此外,獨特任務約束不適用於批次中的任務。
有時,您可能想要確保在任何時間點佇列上只有一個特定任務的實例。 您可以通過在您的任務類別上實現 ShouldBeUnique
介面來做到這一點。 此介面不要求您在類別上定義任何其他方法。
1<?php2 3use Illuminate\Contracts\Queue\ShouldQueue;4use Illuminate\Contracts\Queue\ShouldBeUnique;5 6class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique7{8 // ...9}
在上面的範例中,UpdateSearchIndex
任務是獨特的。 因此,如果佇列上已存在任務的另一個實例且尚未完成處理,則不會調度該任務。
在某些情況下,您可能想要定義一個特定的「金鑰」,使任務獨特,或者您可能想要指定一個超時時間,超過該時間任務不再保持獨特。 為了實現這一點,您可以在您的任務類別上定義 uniqueId
和 uniqueFor
屬性或方法。
1<?php 2 3use App\Models\Product; 4use Illuminate\Contracts\Queue\ShouldQueue; 5use Illuminate\Contracts\Queue\ShouldBeUnique; 6 7class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique 8{ 9 /**10 * The product instance.11 *12 * @var \App\Product13 */14 public $product;15 16 /**17 * The number of seconds after which the job's unique lock will be released.18 *19 * @var int20 */21 public $uniqueFor = 3600;22 23 /**24 * Get the unique ID for the job.25 */26 public function uniqueId(): string27 {28 return $this->product->id;29 }30}
在上面的範例中,UpdateSearchIndex
任務按產品 ID 是獨特的。 因此,任何使用相同產品 ID 的任務的新調度都將被忽略,直到現有任務完成處理。 此外,如果現有任務在一小時內未處理完畢,則獨特鎖將被釋放,並且可以使用相同獨特金鑰的另一個任務調度到佇列。
如果您的應用程式從多個 Web 伺服器或容器調度任務,則您應確保您的所有伺服器都與同一個中央快取伺服器通訊,以便 Laravel 可以準確地判斷任務是否獨特。
保持任務獨特直到處理開始
預設情況下,獨特任務在任務完成處理或所有重試嘗試失敗後「解鎖」。 但是,在某些情況下,您可能希望您的任務在處理之前立即解鎖。 為了實現這一點,您的任務應實現 ShouldBeUniqueUntilProcessing
契約,而不是 ShouldBeUnique
契約。
1<?php 2 3use App\Models\Product; 4use Illuminate\Contracts\Queue\ShouldQueue; 5use Illuminate\Contracts\Queue\ShouldBeUniqueUntilProcessing; 6 7class UpdateSearchIndex implements ShouldQueue, ShouldBeUniqueUntilProcessing 8{ 9 // ...10}
獨特任務鎖
在幕後,當調度 ShouldBeUnique
任務時,Laravel 嘗試使用 uniqueId
金鑰獲取 鎖。 如果未獲取鎖,則不會調度任務。 當任務完成處理或所有重試嘗試失敗時,此鎖將被釋放。 預設情況下,Laravel 將使用預設快取驅動程式來獲取此鎖。 但是,如果您希望使用另一個驅動程式來獲取鎖,您可以定義一個 uniqueVia
方法,該方法傳回應使用的快取驅動程式。
1use Illuminate\Contracts\Cache\Repository; 2use Illuminate\Support\Facades\Cache; 3 4class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique 5{ 6 // ... 7 8 /** 9 * Get the cache driver for the unique job lock.10 */11 public function uniqueVia(): Repository12 {13 return Cache::driver('redis');14 }15}
如果您只需要限制任務的並行處理,請改用 WithoutOverlapping
任務中介層。
加密任務
Laravel 允許您通過 加密來確保任務資料的隱私和完整性。 為了開始使用,只需將 ShouldBeEncrypted
介面新增到任務類別即可。 將此介面新增到類別後,Laravel 將在將您的任務推送到佇列之前自動加密您的任務。
1<?php2 3use Illuminate\Contracts\Queue\ShouldBeEncrypted;4use Illuminate\Contracts\Queue\ShouldQueue;5 6class UpdateSearchIndex implements ShouldQueue, ShouldBeEncrypted7{8 // ...9}
任務中介層
任務中介層允許您在佇列任務的執行周圍包裝自訂邏輯,從而減少任務本身中的樣板程式碼。 例如,考慮以下 handle
方法,該方法利用 Laravel 的 Redis 速率限制功能,以允許每五秒鐘僅處理一個任務。
1use Illuminate\Support\Facades\Redis; 2 3/** 4 * Execute the job. 5 */ 6public function handle(): void 7{ 8 Redis::throttle('key')->block(0)->allow(1)->every(5)->then(function () { 9 info('Lock obtained...');10 11 // Handle job...12 }, function () {13 // Could not obtain lock...14 15 return $this->release(5);16 });17}
雖然此程式碼有效,但 handle
方法的實現變得繁瑣,因為它雜亂地包含 Redis 速率限制邏輯。 此外,對於我們想要速率限制的任何其他任務,都必須複製此速率限制邏輯。
我們可以定義一個處理速率限制的任務中介層,而不是在 handle 方法中進行速率限制。 Laravel 沒有任務中介層的預設位置,因此您可以將任務中介層放置在應用程式中的任何位置。 在此範例中,我們將中介層放置在 app/Jobs/Middleware
目錄中。
1<?php 2 3namespace App\Jobs\Middleware; 4 5use Closure; 6use Illuminate\Support\Facades\Redis; 7 8class RateLimited 9{10 /**11 * Process the queued job.12 *13 * @param \Closure(object): void $next14 */15 public function handle(object $job, Closure $next): void16 {17 Redis::throttle('key')18 ->block(0)->allow(1)->every(5)19 ->then(function () use ($job, $next) {20 // Lock obtained...21 22 $next($job);23 }, function () use ($job) {24 // Could not obtain lock...25 26 $job->release(5);27 });28 }29}
如您所見,與 路由中介層一樣,任務中介層接收正在處理的任務和應調用以繼續處理任務的回調。
建立任務中介層後,可以通過從任務的 middleware
方法傳回它們來將它們附加到任務。 此方法在由 make:job
Artisan 命令建立的任務上不存在,因此您需要手動將其新增到您的任務類別。
1use App\Jobs\Middleware\RateLimited; 2 3/** 4 * Get the middleware the job should pass through. 5 * 6 * @return array<int, object> 7 */ 8public function middleware(): array 9{10 return [new RateLimited];11}
任務中介層也可以分配給可佇列的事件偵聽器、郵件和通知。
速率限制
雖然我們剛才演示了如何編寫您自己的速率限制任務中介層,但 Laravel 實際上包含一個您可以利用來速率限制任務的速率限制中介層。 與 路由速率限制器一樣,任務速率限制器是使用 RateLimiter
facade 的 for
方法定義的。
例如,您可能希望允許使用者每小時備份一次他們的資料,同時對高級客戶不施加此類限制。 為了實現這一點,您可以在 AppServiceProvider
的 boot
方法中定義 RateLimiter
。
1use Illuminate\Cache\RateLimiting\Limit; 2use Illuminate\Support\Facades\RateLimiter; 3 4/** 5 * Bootstrap any application services. 6 */ 7public function boot(): void 8{ 9 RateLimiter::for('backups', function (object $job) {10 return $job->user->vipCustomer()11 ? Limit::none()12 : Limit::perHour(1)->by($job->user->id);13 });14}
在上面的範例中,我們定義了一個每小時速率限制; 但是,您可以使用 perMinute
方法輕鬆定義基於分鐘的速率限制。 此外,您可以將任何您想要的值傳遞給速率限制的 by
方法; 但是,此值最常用於按客戶區分速率限制。
1return Limit::perMinute(50)->by($job->user->id);
定義速率限制後,您可以使用 Illuminate\Queue\Middleware\RateLimited
中介層將速率限制器附加到您的任務。 每次任務超過速率限制時,此中介層都會根據速率限制持續時間將任務釋放回佇列,並帶有適當的延遲。
1use Illuminate\Queue\Middleware\RateLimited; 2 3/** 4 * Get the middleware the job should pass through. 5 * 6 * @return array<int, object> 7 */ 8public function middleware(): array 9{10 return [new RateLimited('backups')];11}
將速率限制任務釋放回佇列仍會增加任務的總 attempts
數。 您可能希望相應地調整任務類別上的 tries
和 maxExceptions
屬性。 或者,您可能希望使用 retryUntil
方法來定義任務應停止嘗試的時間量。
如果您不希望在任務受到速率限制時重試任務,則可以使用 dontRelease
方法。
1/**2 * Get the middleware the job should pass through.3 *4 * @return array<int, object>5 */6public function middleware(): array7{8 return [(new RateLimited('backups'))->dontRelease()];9}
如果您正在使用 Redis,則可以使用 Illuminate\Queue\Middleware\RateLimitedWithRedis
中介層,該中介層針對 Redis 進行了微調,並且比基本速率限制中介層更有效率。
防止任務重疊
Laravel 包含一個 Illuminate\Queue\Middleware\WithoutOverlapping
中介層,允許您根據任意金鑰防止任務重疊。 當佇列任務正在修改應一次僅由一個任務修改的資源時,這會很有幫助。
例如,假設您有一個佇列任務,用於更新使用者的信用評分,並且您想要防止同一使用者 ID 的信用評分更新任務重疊。 為了實現這一點,您可以從任務的 middleware
方法傳回 WithoutOverlapping
中介層。
1use Illuminate\Queue\Middleware\WithoutOverlapping; 2 3/** 4 * Get the middleware the job should pass through. 5 * 6 * @return array<int, object> 7 */ 8public function middleware(): array 9{10 return [new WithoutOverlapping($this->user->id)];11}
任何相同類型的重疊任務都將被釋放回佇列。 您還可以指定必須經過的秒數,然後才能再次嘗試釋放的任務。
1/**2 * Get the middleware the job should pass through.3 *4 * @return array<int, object>5 */6public function middleware(): array7{8 return [(new WithoutOverlapping($this->order->id))->releaseAfter(60)];9}
如果您希望立即刪除任何重疊的任務,以便不會重試它們,則可以使用 dontRelease
方法。
1/**2 * Get the middleware the job should pass through.3 *4 * @return array<int, object>5 */6public function middleware(): array7{8 return [(new WithoutOverlapping($this->order->id))->dontRelease()];9}
WithoutOverlapping
中介層由 Laravel 的原子鎖定功能提供支援。 有時,您的任務可能會意外失敗或超時,以至於鎖未被釋放。 因此,您可以使用 expireAfter
方法顯式定義鎖過期時間。 例如,下面的範例將指示 Laravel 在任務開始處理三分鐘後釋放 WithoutOverlapping
鎖。
1/**2 * Get the middleware the job should pass through.3 *4 * @return array<int, object>5 */6public function middleware(): array7{8 return [(new WithoutOverlapping($this->order->id))->expireAfter(180)];9}
WithoutOverlapping
中介層需要支援 鎖定的快取驅動程式。 目前,memcached
、redis
、dynamodb
、database
、file
和 array
快取驅動程式支援原子鎖定。
在任務類別之間共享鎖金鑰
預設情況下,WithoutOverlapping
中介層只會防止相同類別的任務重疊。 因此,雖然兩個不同的任務類別可能使用相同的鎖金鑰,但它們不會被阻止重疊。 但是,您可以使用 shared
方法指示 Laravel 在任務類別之間應用金鑰。
1use Illuminate\Queue\Middleware\WithoutOverlapping; 2 3class ProviderIsDown 4{ 5 // ... 6 7 public function middleware(): array 8 { 9 return [10 (new WithoutOverlapping("status:{$this->provider}"))->shared(),11 ];12 }13}14 15class ProviderIsUp16{17 // ...18 19 public function middleware(): array20 {21 return [22 (new WithoutOverlapping("status:{$this->provider}"))->shared(),23 ];24 }25}
節流異常
Laravel 包含一個 Illuminate\Queue\Middleware\ThrottlesExceptions
中介層,允許您節流異常。 一旦任務拋出給定數量的異常,所有進一步嘗試執行任務的操作都會延遲,直到指定的時間間隔過去。 此中介層對於與不穩定的第三方服務互動的任務特別有用。
例如,假設一個佇列任務與開始拋出異常的第三方 API 互動。 為了節流異常,您可以從任務的 middleware
方法傳回 ThrottlesExceptions
中介層。 通常,此中介層應與實現 基於時間的嘗試的任務配對。
1use DateTime; 2use Illuminate\Queue\Middleware\ThrottlesExceptions; 3 4/** 5 * Get the middleware the job should pass through. 6 * 7 * @return array<int, object> 8 */ 9public function middleware(): array10{11 return [new ThrottlesExceptions(10, 5 * 60)];12}13 14/**15 * Determine the time at which the job should timeout.16 */17public function retryUntil(): DateTime18{19 return now()->addMinutes(30);20}
中介層接受的第一個建構函式參數是任務在被節流之前可以拋出的異常數量,而第二個建構函式參數是任務在被節流後應等待的時間秒數,然後再嘗試任務。 在上面的程式碼範例中,如果任務拋出 10 個連續異常,我們將等待 5 分鐘,然後再嘗試任務,並受 30 分鐘時間限制的約束。
當任務拋出異常但尚未達到異常閾值時,任務通常會立即重試。 但是,您可以通過在將中介層附加到任務時調用 backoff
方法來指定此類任務應延遲的分鐘數。
1use Illuminate\Queue\Middleware\ThrottlesExceptions; 2 3/** 4 * Get the middleware the job should pass through. 5 * 6 * @return array<int, object> 7 */ 8public function middleware(): array 9{10 return [(new ThrottlesExceptions(10, 5 * 60))->backoff(5)];11}
在內部,此中介層使用 Laravel 的快取系統來實現速率限制,並且任務的類別名稱用作快取「金鑰」。 您可以通過在將中介層附加到任務時調用 by
方法來覆蓋此金鑰。 如果您有多個任務與同一個第三方服務互動,並且您希望它們共享一個通用的節流「儲存桶」,則這可能會很有用。
1use Illuminate\Queue\Middleware\ThrottlesExceptions; 2 3/** 4 * Get the middleware the job should pass through. 5 * 6 * @return array<int, object> 7 */ 8public function middleware(): array 9{10 return [(new ThrottlesExceptions(10, 10 * 60))->by('key')];11}
預設情況下,此中介層將節流每個異常。 您可以通過在將中介層附加到任務時調用 when
方法來修改此行為。 然後,僅當提供給 when
方法的閉包傳回 true
時,才會節流異常。
1use Illuminate\Http\Client\HttpClientException; 2use Illuminate\Queue\Middleware\ThrottlesExceptions; 3 4/** 5 * Get the middleware the job should pass through. 6 * 7 * @return array<int, object> 8 */ 9public function middleware(): array10{11 return [(new ThrottlesExceptions(10, 10 * 60))->when(12 fn (Throwable $throwable) => $throwable instanceof HttpClientException13 )];14}
如果您希望將節流的異常報告給應用程式的異常處理程序,您可以通過在將中介層附加到任務時調用 report
方法來實現。 或者,您可以提供一個閉包給 report
方法,並且僅當給定的閉包傳回 true
時,才會報告異常。
1use Illuminate\Http\Client\HttpClientException; 2use Illuminate\Queue\Middleware\ThrottlesExceptions; 3 4/** 5 * Get the middleware the job should pass through. 6 * 7 * @return array<int, object> 8 */ 9public function middleware(): array10{11 return [(new ThrottlesExceptions(10, 10 * 60))->report(12 fn (Throwable $throwable) => $throwable instanceof HttpClientException13 )];14}
如果您正在使用 Redis,則可以使用 Illuminate\Queue\Middleware\ThrottlesExceptionsWithRedis
中介層,該中介層針對 Redis 進行了微調,並且比基本異常節流中介層更有效率。
跳過任務
Skip
中介層允許您指定應跳過/刪除任務,而無需修改任務的邏輯。 如果給定的條件評估為 true
,則 Skip::when
方法將刪除任務,而如果條件評估為 false
,則 Skip::unless
方法將刪除任務。
1use Illuminate\Queue\Middleware\Skip; 2 3/** 4 * Get the middleware the job should pass through. 5 */ 6public function middleware(): array 7{ 8 return [ 9 Skip::when($someCondition),10 ];11}
您還可以將 Closure
傳遞給 when
和 unless
方法,以進行更複雜的條件評估。
1use Illuminate\Queue\Middleware\Skip; 2 3/** 4 * Get the middleware the job should pass through. 5 */ 6public function middleware(): array 7{ 8 return [ 9 Skip::when(function (): bool {10 return $this->shouldSkip();11 }),12 ];13}
調度任務
編寫完任務類別後,您可以使用任務本身的 dispatch
方法來調度它。 傳遞給 dispatch
方法的參數將被賦予任務的建構函式。
1<?php 2 3namespace App\Http\Controllers; 4 5use App\Http\Controllers\Controller; 6use App\Jobs\ProcessPodcast; 7use App\Models\Podcast; 8use Illuminate\Http\RedirectResponse; 9use Illuminate\Http\Request;10 11class PodcastController extends Controller12{13 /**14 * Store a new podcast.15 */16 public function store(Request $request): RedirectResponse17 {18 $podcast = Podcast::create(/* ... */);19 20 // ...21 22 ProcessPodcast::dispatch($podcast);23 24 return redirect('/podcasts');25 }26}
如果您想有條件地調度任務,可以使用 dispatchIf
和 dispatchUnless
方法。
1ProcessPodcast::dispatchIf($accountActive, $podcast);2 3ProcessPodcast::dispatchUnless($accountSuspended, $podcast);
在新的 Laravel 應用程式中,sync
驅動程式是預設的佇列驅動程式。 此驅動程式在目前請求的前景中同步執行任務,這在本地開發期間通常很方便。 如果您想實際開始佇列任務以進行背景處理,您可以在應用程式的 config/queue.php
設定檔中指定不同的佇列驅動程式。
延遲調度
如果您想指定任務不應立即供佇列工作程序處理,您可以在調度任務時使用 delay
方法。 例如,讓我們指定任務在調度後 10 分鐘內不應可用於處理。
1<?php 2 3namespace App\Http\Controllers; 4 5use App\Http\Controllers\Controller; 6use App\Jobs\ProcessPodcast; 7use App\Models\Podcast; 8use Illuminate\Http\RedirectResponse; 9use Illuminate\Http\Request;10 11class PodcastController extends Controller12{13 /**14 * Store a new podcast.15 */16 public function store(Request $request): RedirectResponse17 {18 $podcast = Podcast::create(/* ... */);19 20 // ...21 22 ProcessPodcast::dispatch($podcast)23 ->delay(now()->addMinutes(10));24 25 return redirect('/podcasts');26 }27}
在某些情況下,任務可能配置了預設延遲。 如果您需要繞過此延遲並調度任務以立即處理,則可以使用 withoutDelay
方法。
1ProcessPodcast::dispatch($podcast)->withoutDelay();
Amazon SQS 佇列服務的最大延遲時間為 15 分鐘。
在回應傳送到瀏覽器後調度
或者,如果您的 Web 伺服器正在使用 FastCGI,則 dispatchAfterResponse
方法會延遲調度任務,直到 HTTP 回應傳送到使用者的瀏覽器之後。 即使佇列任務仍在執行,這仍然允許使用者開始使用應用程式。 這通常僅應用於大約需要一秒鐘的任務,例如發送電子郵件。 由於它們在目前的 HTTP 請求中處理,因此以這種方式調度的任務不需要佇列工作程序正在運行才能處理它們。
1use App\Jobs\SendNotification;2 3SendNotification::dispatchAfterResponse();
您也可以 dispatch
閉包,並將 afterResponse
方法鏈接到 dispatch
輔助方法上,以便在 HTTP 回應已發送到瀏覽器後執行閉包
1use App\Mail\WelcomeMessage;2use Illuminate\Support\Facades\Mail;3 4dispatch(function () {6})->afterResponse();
同步調度
如果您想要立即(同步地)調度任務,可以使用 dispatchSync
方法。當使用此方法時,任務將不會被放入佇列,而是會立即在目前的程序中執行
1<?php 2 3namespace App\Http\Controllers; 4 5use App\Http\Controllers\Controller; 6use App\Jobs\ProcessPodcast; 7use App\Models\Podcast; 8use Illuminate\Http\RedirectResponse; 9use Illuminate\Http\Request;10 11class PodcastController extends Controller12{13 /**14 * Store a new podcast.15 */16 public function store(Request $request): RedirectResponse17 {18 $podcast = Podcast::create(/* ... */);19 20 // Create podcast...21 22 ProcessPodcast::dispatchSync($podcast);23 24 return redirect('/podcasts');25 }26}
任務 & 資料庫事務
雖然在資料庫交易中調度任務是完全可以的,但您應該特別注意確保您的任務實際上能夠成功執行。在交易中調度任務時,任務有可能在父交易提交之前就被 worker 處理。當這種情況發生時,您在資料庫交易期間對模型或資料庫記錄所做的任何更新可能尚未反映在資料庫中。此外,在交易中建立的任何模型或資料庫記錄可能還不存在於資料庫中。
幸運的是,Laravel 提供了幾種方法來解決這個問題。首先,您可以在佇列連線的配置陣列中設定 after_commit
連線選項
1'redis' => [2 'driver' => 'redis',3 // ...4 'after_commit' => true,5],
當 after_commit
選項為 true
時,您可以在資料庫交易中調度任務;但是,Laravel 會等到開啟的父資料庫交易都提交後,才會實際調度任務。當然,如果目前沒有開啟任何資料庫交易,任務將會立即被調度。
如果交易由於交易期間發生的異常而回滾,則在該交易期間調度的任務將被丟棄。
將 after_commit
配置選項設定為 true
也會導致任何排隊的事件監聽器、郵件、通知和廣播事件在所有開啟的資料庫交易都提交後才被調度。
指定 Commit 調度行為內聯
如果您沒有將 after_commit
佇列連線配置選項設定為 true
,您仍然可以指示特定的任務應該在所有開啟的資料庫交易都提交後才被調度。要完成此操作,您可以將 afterCommit
方法鏈接到您的調度操作上
1use App\Jobs\ProcessPodcast;2 3ProcessPodcast::dispatch($podcast)->afterCommit();
同樣地,如果 after_commit
配置選項設定為 true
,您可以指示特定的任務應該立即調度,而無需等待任何開啟的資料庫交易提交
1ProcessPodcast::dispatch($podcast)->beforeCommit();
任務鏈
任務鏈允許您指定一系列排隊的任務,這些任務應該在主要任務成功執行後按順序執行。如果序列中的一個任務失敗,其餘的任務將不會被執行。要執行排隊的任務鏈,您可以使用 Bus
facade 提供的 chain
方法。Laravel 的命令總線是一個更底層的元件,排隊任務調度是建立在其之上的
1use App\Jobs\OptimizePodcast; 2use App\Jobs\ProcessPodcast; 3use App\Jobs\ReleasePodcast; 4use Illuminate\Support\Facades\Bus; 5 6Bus::chain([ 7 new ProcessPodcast, 8 new OptimizePodcast, 9 new ReleasePodcast,10])->dispatch();
除了鏈接任務類別實例之外,您還可以鏈接閉包
1Bus::chain([2 new ProcessPodcast,3 new OptimizePodcast,4 function () {5 Podcast::update(/* ... */);6 },7])->dispatch();
在任務中使用 $this->delete()
方法刪除任務不會阻止鏈接任務被處理。只有當鏈中的任務失敗時,鏈才會停止執行。
鏈接連線和佇列
如果您想指定鏈接任務應該使用的連線和佇列,可以使用 onConnection
和 onQueue
方法。這些方法指定應該使用的佇列連線和佇列名稱,除非排隊任務被明確分配了不同的連線 / 佇列
1Bus::chain([2 new ProcessPodcast,3 new OptimizePodcast,4 new ReleasePodcast,5])->onConnection('redis')->onQueue('podcasts')->dispatch();
將任務添加到鏈中
有時,您可能需要在現有任務鏈中的另一個任務中,將一個任務前置或附加到該鏈中。您可以使用 prependToChain
和 appendToChain
方法來完成此操作
1/** 2 * Execute the job. 3 */ 4public function handle(): void 5{ 6 // ... 7 8 // Prepend to the current chain, run job immediately after current job... 9 $this->prependToChain(new TranscribePodcast);10 11 // Append to the current chain, run job at end of chain...12 $this->appendToChain(new TranscribePodcast);13}
鏈接失敗
當鏈接任務時,您可以使用 catch
方法來指定一個閉包,如果鏈中的任務失敗,則應該調用該閉包。給定的回呼將接收導致任務失敗的 Throwable
實例
1use Illuminate\Support\Facades\Bus; 2use Throwable; 3 4Bus::chain([ 5 new ProcessPodcast, 6 new OptimizePodcast, 7 new ReleasePodcast, 8])->catch(function (Throwable $e) { 9 // A job within the chain has failed...10})->dispatch();
由於鏈回呼是由 Laravel 佇列序列化並稍後執行的,因此您不應該在鏈回呼中使用 $this
變數。
自訂佇列和連線
調度到特定的佇列
透過將任務推送到不同的佇列,您可以「分類」您的排隊任務,甚至可以優先考慮您分配給各種佇列的 worker 數量。請記住,這不會將任務推送到您的佇列設定檔中定義的不同佇列「連線」,而只是推送到單個連線中的特定佇列。要指定佇列,請在調度任務時使用 onQueue
方法
1<?php 2 3namespace App\Http\Controllers; 4 5use App\Http\Controllers\Controller; 6use App\Jobs\ProcessPodcast; 7use App\Models\Podcast; 8use Illuminate\Http\RedirectResponse; 9use Illuminate\Http\Request;10 11class PodcastController extends Controller12{13 /**14 * Store a new podcast.15 */16 public function store(Request $request): RedirectResponse17 {18 $podcast = Podcast::create(/* ... */);19 20 // Create podcast...21 22 ProcessPodcast::dispatch($podcast)->onQueue('processing');23 24 return redirect('/podcasts');25 }26}
或者,您也可以在任務的建構子中呼叫 onQueue
方法來指定任務的佇列
1<?php 2 3namespace App\Jobs; 4 5use Illuminate\Contracts\Queue\ShouldQueue; 6use Illuminate\Foundation\Queue\Queueable; 7 8class ProcessPodcast implements ShouldQueue 9{10 use Queueable;11 12 /**13 * Create a new job instance.14 */15 public function __construct()16 {17 $this->onQueue('processing');18 }19}
調度到特定的連線
如果您的應用程式與多個佇列連線互動,您可以使用 onConnection
方法指定要將任務推送到哪個連線
1<?php 2 3namespace App\Http\Controllers; 4 5use App\Http\Controllers\Controller; 6use App\Jobs\ProcessPodcast; 7use App\Models\Podcast; 8use Illuminate\Http\RedirectResponse; 9use Illuminate\Http\Request;10 11class PodcastController extends Controller12{13 /**14 * Store a new podcast.15 */16 public function store(Request $request): RedirectResponse17 {18 $podcast = Podcast::create(/* ... */);19 20 // Create podcast...21 22 ProcessPodcast::dispatch($podcast)->onConnection('sqs');23 24 return redirect('/podcasts');25 }26}
您可以將 onConnection
和 onQueue
方法鏈接在一起,以指定任務的連線和佇列
1ProcessPodcast::dispatch($podcast)2 ->onConnection('sqs')3 ->onQueue('processing');
或者,您也可以在任務的建構子中呼叫 onConnection
方法來指定任務的連線
1<?php 2 3namespace App\Jobs; 4 5use Illuminate\Contracts\Queue\ShouldQueue; 6use Illuminate\Foundation\Queue\Queueable; 7 8class ProcessPodcast implements ShouldQueue 9{10 use Queueable;11 12 /**13 * Create a new job instance.14 */15 public function __construct()16 {17 $this->onConnection('sqs');18 }19}
指定最大任務嘗試次數 / 超時值
最大嘗試次數
如果您的排隊任務之一遇到錯誤,您可能不希望它無限期地重試。因此,Laravel 提供了各種方法來指定任務可以嘗試多少次或多長時間。
指定任務可以嘗試的最大次數的一種方法是透過 Artisan 命令列上的 --tries
切換器。除非正在處理的任務指定了它可以嘗試的次數,否則這將適用於 worker 處理的所有任務
1php artisan queue:work --tries=3
如果任務超過其最大嘗試次數,它將被視為「失敗」的任務。有關處理失敗任務的更多資訊,請參閱失敗任務文件。如果為 queue:work
命令提供了 --tries=0
,則任務將無限期地重試。
您可以採用更細緻的方法,在任務類別本身上定義任務可以嘗試的最大次數。如果在任務上指定了最大嘗試次數,它將優先於命令列上提供的 --tries
值
1<?php 2 3namespace App\Jobs; 4 5class ProcessPodcast implements ShouldQueue 6{ 7 /** 8 * The number of times the job may be attempted. 9 *10 * @var int11 */12 public $tries = 5;13}
如果您需要動態控制特定任務的最大嘗試次數,您可以在任務上定義一個 tries
方法
1/**2 * Determine number of times the job may be attempted.3 */4public function tries(): int5{6 return 5;7}
基於時間的嘗試
作為定義任務在失敗之前可以嘗試多少次數的替代方法,您可以定義一個時間,在該時間之後任務不應再被嘗試。這允許任務在給定的時間範圍內嘗試任意次數。要定義任務不應再被嘗試的時間,請將 retryUntil
方法添加到您的任務類別中。此方法應傳回 DateTime
實例
1use DateTime;2 3/**4 * Determine the time at which the job should timeout.5 */6public function retryUntil(): DateTime7{8 return now()->addMinutes(10);9}
您也可以在您的排隊事件監聽器上定義 tries
屬性或 retryUntil
方法。
最大異常數
有時您可能希望指定任務可以嘗試多次,但如果重試是由給定數量的未處理異常觸發的(而不是由 release
方法直接釋放),則應該失敗。要完成此操作,您可以在您的任務類別上定義一個 maxExceptions
屬性
1<?php 2 3namespace App\Jobs; 4 5use Illuminate\Support\Facades\Redis; 6 7class ProcessPodcast implements ShouldQueue 8{ 9 /**10 * The number of times the job may be attempted.11 *12 * @var int13 */14 public $tries = 25;15 16 /**17 * The maximum number of unhandled exceptions to allow before failing.18 *19 * @var int20 */21 public $maxExceptions = 3;22 23 /**24 * Execute the job.25 */26 public function handle(): void27 {28 Redis::throttle('key')->allow(10)->every(60)->then(function () {29 // Lock obtained, process the podcast...30 }, function () {31 // Unable to obtain lock...32 return $this->release(10);33 });34 }35}
在此範例中,如果應用程式無法取得 Redis 鎖定,則任務將被釋放十秒鐘,並將繼續重試最多 25 次。但是,如果任務拋出三個未處理的異常,則任務將失敗。
逾時
通常,您大致知道您期望排隊任務花費多長時間。因此,Laravel 允許您指定「逾時」值。預設情況下,逾時值為 60 秒。如果任務處理時間超過逾時值指定的秒數,則處理該任務的 worker 將會錯誤退出。通常,worker 會由您伺服器上設定的程序管理器自動重新啟動。
可以使用 Artisan 命令列上的 --timeout
切換器來指定任務可以運行的最大秒數
1php artisan queue:work --timeout=30
如果任務因持續逾時而超過其最大嘗試次數,則會將其標記為失敗。
您也可以在任務類別本身上定義允許任務運行的最大秒數。如果在任務上指定了逾時,它將優先於命令列上指定的任何逾時
1<?php 2 3namespace App\Jobs; 4 5class ProcessPodcast implements ShouldQueue 6{ 7 /** 8 * The number of seconds the job can run before timing out. 9 *10 * @var int11 */12 public $timeout = 120;13}
有時,IO 阻塞程序(例如 socket 或傳出的 HTTP 連線)可能不遵守您指定的逾時。因此,當使用這些功能時,您應該始終嘗試也使用它們的 API 指定逾時。例如,當使用 Guzzle 時,您應該始終指定連線和請求逾時值。
必須安裝 pcntl
PHP 擴充功能才能指定任務逾時。此外,任務的「逾時」值應始終小於其「重試間隔」值。否則,任務可能會在實際完成執行或逾時之前被重新嘗試。
逾時時失敗
如果您想指示任務在逾時時應標記為失敗,您可以在任務類別上定義 $failOnTimeout
屬性
1/**2 * Indicate if the job should be marked as failed on timeout.3 *4 * @var bool5 */6public $failOnTimeout = true;
錯誤處理
如果在處理任務時拋出異常,則任務將自動釋放回佇列,以便可以再次嘗試。任務將繼續釋放,直到它已達到您的應用程式允許的最大嘗試次數。最大嘗試次數由 queue:work
Artisan 命令上使用的 --tries
切換器定義。或者,最大嘗試次數可以在任務類別本身上定義。有關運行佇列 worker 的更多資訊可以在下面找到。
手動釋放任務
有時您可能希望手動將任務釋放回佇列,以便稍後可以再次嘗試。您可以透過呼叫 release
方法來完成此操作
1/**2 * Execute the job.3 */4public function handle(): void5{6 // ...7 8 $this->release();9}
預設情況下,release
方法會將任務釋放回佇列以立即處理。但是,您可以指示佇列在給定秒數過去後才使任務可用於處理,方法是將整數或日期實例傳遞給 release
方法
1$this->release(10);2 3$this->release(now()->addSeconds(10));
手動使任務失敗
有時您可能需要手動將任務標記為「失敗」。要做到這一點,您可以呼叫 fail
方法
1/**2 * Execute the job.3 */4public function handle(): void5{6 // ...7 8 $this->fail();9}
如果您想因為您捕獲到的異常而將任務標記為失敗,您可以將異常傳遞給 fail
方法。或者,為了方便起見,您可以傳遞一個字串錯誤訊息,該訊息將為您轉換為異常
1$this->fail($exception);2 3$this->fail('Something went wrong.');
有關失敗任務的更多資訊,請查看有關處理任務失敗的文件。
任務批次處理
Laravel 的任務批次處理功能允許您輕鬆執行一批任務,然後在該批任務完成執行時執行某些操作。在開始之前,您應該建立一個資料庫遷移,以建立一個表,該表將包含有關您的任務批次的元資訊,例如其完成百分比。可以使用 make:queue-batches-table
Artisan 命令產生此遷移
1php artisan make:queue-batches-table2 3php artisan migrate
定義可批次處理的任務
要定義可批次處理的任務,您應該像往常一樣建立可排隊的任務;但是,您應該將 Illuminate\Bus\Batchable
trait 添加到任務類別中。此 trait 提供對 batch
方法的存取,該方法可用於檢索任務正在其中執行的目前批次
1<?php 2 3namespace App\Jobs; 4 5use Illuminate\Bus\Batchable; 6use Illuminate\Contracts\Queue\ShouldQueue; 7use Illuminate\Foundation\Queue\Queueable; 8 9class ImportCsv implements ShouldQueue10{11 use Batchable, Queueable;12 13 /**14 * Execute the job.15 */16 public function handle(): void17 {18 if ($this->batch()->cancelled()) {19 // Determine if the batch has been cancelled...20 21 return;22 }23 24 // Import a portion of the CSV file...25 }26}
調度批次
要調度一批任務,您應該使用 Bus
facade 的 batch
方法。當然,批次處理主要在與完成回呼結合使用時才有用。因此,您可以使用 then
、catch
和 finally
方法來定義批次的完成回呼。當調用這些回呼時,每個回呼都將接收一個 Illuminate\Bus\Batch
實例。在此範例中,我們將想像我們正在佇列一批任務,每個任務都處理來自 CSV 檔案的給定行數
1use App\Jobs\ImportCsv; 2use Illuminate\Bus\Batch; 3use Illuminate\Support\Facades\Bus; 4use Throwable; 5 6$batch = Bus::batch([ 7 new ImportCsv(1, 100), 8 new ImportCsv(101, 200), 9 new ImportCsv(201, 300),10 new ImportCsv(301, 400),11 new ImportCsv(401, 500),12])->before(function (Batch $batch) {13 // The batch has been created but no jobs have been added...14})->progress(function (Batch $batch) {15 // A single job has completed successfully...16})->then(function (Batch $batch) {17 // All jobs completed successfully...18})->catch(function (Batch $batch, Throwable $e) {19 // First batch job failure detected...20})->finally(function (Batch $batch) {21 // The batch has finished executing...22})->dispatch();23 24return $batch->id;
批次的 ID 可以透過 $batch->id
屬性存取,可用於在任務調度後查詢 Laravel 命令總線以取得有關批次的資訊。
由於批次回呼是由 Laravel 佇列序列化並稍後執行的,因此您不應該在回呼中使用 $this
變數。此外,由於批次任務被包裝在資料庫交易中,因此不應在任務中執行觸發隱式提交的資料庫語句。
命名批次
如果批次被命名,某些工具(例如 Laravel Horizon 和 Laravel Telescope)可能會為批次提供更人性化的偵錯資訊。要為批次分配任意名稱,您可以在定義批次時呼叫 name
方法
1$batch = Bus::batch([2 // ...3])->then(function (Batch $batch) {4 // All jobs completed successfully...5})->name('Import CSV')->dispatch();
批次連線和佇列
如果您想指定批次任務應該使用的連線和佇列,可以使用 onConnection
和 onQueue
方法。所有批次任務都必須在相同的連線和佇列中執行
1$batch = Bus::batch([2 // ...3])->then(function (Batch $batch) {4 // All jobs completed successfully...5})->onConnection('redis')->onQueue('imports')->dispatch();
鏈和批次
您可以透過將鏈接任務放在陣列中,在批次中定義一組鏈接任務。例如,我們可以並行執行兩個任務鏈,並在兩個任務鏈都完成處理後執行回呼
1use App\Jobs\ReleasePodcast; 2use App\Jobs\SendPodcastReleaseNotification; 3use Illuminate\Bus\Batch; 4use Illuminate\Support\Facades\Bus; 5 6Bus::batch([ 7 [ 8 new ReleasePodcast(1), 9 new SendPodcastReleaseNotification(1),10 ],11 [12 new ReleasePodcast(2),13 new SendPodcastReleaseNotification(2),14 ],15])->then(function (Batch $batch) {16 // ...17})->dispatch();
相反地,您可以透過在鏈中定義批次,在鏈中運行批次任務。例如,您可以先運行一批任務來發布多個 podcast,然後運行一批任務來發送發布通知
1use App\Jobs\FlushPodcastCache; 2use App\Jobs\ReleasePodcast; 3use App\Jobs\SendPodcastReleaseNotification; 4use Illuminate\Support\Facades\Bus; 5 6Bus::chain([ 7 new FlushPodcastCache, 8 Bus::batch([ 9 new ReleasePodcast(1),10 new ReleasePodcast(2),11 ]),12 Bus::batch([13 new SendPodcastReleaseNotification(1),14 new SendPodcastReleaseNotification(2),15 ]),16])->dispatch();
將任務新增到批次
有時,從批次任務中將其他任務添加到批次中可能很有用。當您需要批次處理數千個任務時,此模式可能很有用,這些任務可能需要很長時間才能在 Web 請求期間調度。因此,相反,您可能希望調度一批初始「載入器」任務,這些任務會用更多任務來填充批次
1$batch = Bus::batch([2 new LoadImportBatch,3 new LoadImportBatch,4 new LoadImportBatch,5])->then(function (Batch $batch) {6 // All jobs completed successfully...7})->name('Import Contacts')->dispatch();
在此範例中,我們將使用 LoadImportBatch
任務來用其他任務填充批次。要完成此操作,我們可以使用批次實例上的 add
方法,該實例可以透過任務的 batch
方法存取
1use App\Jobs\ImportContacts; 2use Illuminate\Support\Collection; 3 4/** 5 * Execute the job. 6 */ 7public function handle(): void 8{ 9 if ($this->batch()->cancelled()) {10 return;11 }12 13 $this->batch()->add(Collection::times(1000, function () {14 return new ImportContacts;15 }));16}
您只能從屬於同一批次的任務中將任務添加到批次中。
檢查批次
提供給批次完成回呼的 Illuminate\Bus\Batch
實例具有各種屬性和方法,可協助您與給定批次任務互動和檢查
1// The UUID of the batch... 2$batch->id; 3 4// The name of the batch (if applicable)... 5$batch->name; 6 7// The number of jobs assigned to the batch... 8$batch->totalJobs; 9 10// The number of jobs that have not been processed by the queue...11$batch->pendingJobs;12 13// The number of jobs that have failed...14$batch->failedJobs;15 16// The number of jobs that have been processed thus far...17$batch->processedJobs();18 19// The completion percentage of the batch (0-100)...20$batch->progress();21 22// Indicates if the batch has finished executing...23$batch->finished();24 25// Cancel the execution of the batch...26$batch->cancel();27 28// Indicates if the batch has been cancelled...29$batch->cancelled();
從路由傳回批次
所有 Illuminate\Bus\Batch
實例都是 JSON 可序列化的,這表示您可以直接從應用程式的路由之一傳回它們,以檢索包含有關批次的資訊的 JSON payload,包括其完成進度。這使得在應用程式的 UI 中顯示有關批次完成進度的資訊變得方便。
要依其 ID 檢索批次,您可以使用 Bus
facade 的 findBatch
方法
1use Illuminate\Support\Facades\Bus;2use Illuminate\Support\Facades\Route;3 4Route::get('/batch/{batchId}', function (string $batchId) {5 return Bus::findBatch($batchId);6});
取消批次
有時您可能需要取消給定批次的執行。可以透過呼叫 Illuminate\Bus\Batch
實例上的 cancel
方法來完成此操作
1/** 2 * Execute the job. 3 */ 4public function handle(): void 5{ 6 if ($this->user->exceedsImportLimit()) { 7 return $this->batch()->cancel(); 8 } 9 10 if ($this->batch()->cancelled()) {11 return;12 }13}
正如您可能在先前的範例中注意到的,批次任務通常應在繼續執行之前確定其對應的批次是否已取消。但是,為了方便起見,您可以將 SkipIfBatchCancelled
middleware 分配給任務。顧名思義,如果其對應的批次已取消,此 middleware 將指示 Laravel 不要處理該任務
1use Illuminate\Queue\Middleware\SkipIfBatchCancelled;2 3/**4 * Get the middleware the job should pass through.5 */6public function middleware(): array7{8 return [new SkipIfBatchCancelled];9}
批次失敗
當批次任務失敗時,將調用 catch
回呼(如果已分配)。此回呼僅針對批次中第一個失敗的任務調用。
允許失敗
當批次中的任務失敗時,Laravel 將自動將批次標記為「已取消」。如果您願意,您可以停用此行為,以便任務失敗不會自動將批次標記為已取消。可以透過在調度批次時呼叫 allowFailures
方法來完成此操作
1$batch = Bus::batch([2 // ...3])->then(function (Batch $batch) {4 // All jobs completed successfully...5})->allowFailures()->dispatch();
重試失敗的批次任務
為了方便起見,Laravel 提供了一個 queue:retry-batch
Artisan 命令,允許您輕鬆重試給定批次的所有失敗任務。queue:retry-batch
命令接受批次的 UUID,其失敗任務應重試
1php artisan queue:retry-batch 32dbc76c-4f82-4749-b610-a639fe0099b5
修剪批次
如果不進行修剪,job_batches
表可能會非常快速地累積記錄。為了緩解這個問題,您應該排程 queue:prune-batches
Artisan 命令每天運行
1use Illuminate\Support\Facades\Schedule;2 3Schedule::command('queue:prune-batches')->daily();
預設情況下,所有超過 24 小時的已完成批次都將被修剪。當呼叫命令時,您可以使用 hours
選項來決定要保留批次資料多長時間。例如,以下命令將刪除所有超過 48 小時前完成的批次
1use Illuminate\Support\Facades\Schedule;2 3Schedule::command('queue:prune-batches --hours=48')->daily();
有時,您的 jobs_batches
表可能會累積永遠未成功完成的批次的批次記錄,例如任務失敗且該任務從未成功重試的批次。您可以使用 unfinished
選項指示 queue:prune-batches
命令修剪這些未完成的批次記錄
1use Illuminate\Support\Facades\Schedule;2 3Schedule::command('queue:prune-batches --hours=48 --unfinished=72')->daily();
同樣地,您的 jobs_batches
表也可能會累積已取消批次的批次記錄。您可以使用 cancelled
選項指示 queue:prune-batches
命令修剪這些已取消的批次記錄
1use Illuminate\Support\Facades\Schedule;2 3Schedule::command('queue:prune-batches --hours=48 --cancelled=72')->daily();
在 DynamoDB 中儲存批次
Laravel 也支援將批次元資訊儲存在 DynamoDB 中,而不是關聯式資料庫。但是,您需要手動建立 DynamoDB 表來儲存所有批次記錄。
通常,此表應命名為 job_batches
,但您應根據應用程式 queue
設定檔中 queue.batching.table
設定值來命名表。
DynamoDB 批次表配置
job_batches
表應具有一個名為 application
的字串主分割區鍵和一個名為 id
的字串主排序鍵。鍵的 application
部分將包含您的應用程式名稱,如應用程式 app
設定檔中的 name
設定值所定義。由於應用程式名稱是 DynamoDB 表鍵的一部分,因此您可以使用相同的表來儲存多個 Laravel 應用程式的任務批次。
此外,如果您想利用自動批次修剪,您可以為您的表定義 ttl
屬性。
DynamoDB 配置
接下來,安裝 AWS SDK,以便您的 Laravel 應用程式可以與 Amazon DynamoDB 通訊
1composer require aws/aws-sdk-php
然後,將 queue.batching.driver
配置選項的值設定為 dynamodb
。此外,您應在 batching
配置陣列中定義 key
、secret
和 region
配置選項。這些選項將用於向 AWS 驗證身分。當使用 dynamodb
驅動程式時,queue.batching.database
配置選項是不必要的
1'batching' => [2 'driver' => env('QUEUE_BATCHING_DRIVER', 'dynamodb'),3 'key' => env('AWS_ACCESS_KEY_ID'),4 'secret' => env('AWS_SECRET_ACCESS_KEY'),5 'region' => env('AWS_DEFAULT_REGION', 'us-east-1'),6 'table' => 'job_batches',7],
在 DynamoDB 中修剪批次
當使用 DynamoDB 儲存任務批次資訊時,用於修剪儲存在關聯式資料庫中的批次的典型修剪命令將無法運作。相反地,您可以使用 DynamoDB 的原生 TTL 功能來自動移除舊批次的記錄。
如果您使用 ttl
屬性定義了 DynamoDB 表,您可以定義配置參數,以指示 Laravel 如何修剪批次記錄。queue.batching.ttl_attribute
配置值定義了保存 TTL 的屬性名稱,而 queue.batching.ttl
配置值定義了在批次記錄可以從 DynamoDB 表中移除之前的秒數,相對於上次更新記錄的時間
1'batching' => [2 'driver' => env('QUEUE_FAILED_DRIVER', 'dynamodb'),3 'key' => env('AWS_ACCESS_KEY_ID'),4 'secret' => env('AWS_SECRET_ACCESS_KEY'),5 'region' => env('AWS_DEFAULT_REGION', 'us-east-1'),6 'table' => 'job_batches',7 'ttl_attribute' => 'ttl',8 'ttl' => 60 * 60 * 24 * 7, // 7 days...9],
佇列閉包
除了將任務類別調度到佇列之外,您還可以調度閉包。這對於需要在目前請求週期之外執行的快速、簡單任務非常有用。當將閉包調度到佇列時,閉包的程式碼內容會經過加密簽名,使其在傳輸過程中無法被修改
1$podcast = App\Podcast::find(1);2 3dispatch(function () use ($podcast) {4 $podcast->publish();5});
使用 catch
方法,您可以提供一個閉包,如果排隊的閉包在耗盡佇列的所有配置的重試嘗試後未能成功完成,則應執行該閉包
1use Throwable;2 3dispatch(function () use ($podcast) {4 $podcast->publish();5})->catch(function (Throwable $e) {6 // This job has failed...7});
由於 catch
回呼是由 Laravel 佇列序列化並稍後執行的,因此您不應在 catch
回呼中使用 $this
變數。
執行佇列工作程序
queue:work
命令
Laravel 包含一個 Artisan 命令,它將啟動佇列 worker 並在新任務被推送到佇列時處理它們。您可以使用 queue:work
Artisan 命令運行 worker。請注意,一旦 queue:work
命令啟動,它將繼續運行,直到手動停止或您關閉終端機
1php artisan queue:work
為了使 queue:work
程序在後台永久運行,您應該使用程序監視器(例如Supervisor)來確保佇列 worker 不會停止運行。
如果您希望處理後的任務 ID 包含在命令的輸出中,您可以在調用 queue:work
命令時包含 -v
標誌
1php artisan queue:work -v
請記住,佇列 worker 是長時間運行的程序,並將啟動的應用程式狀態儲存在記憶體中。因此,它們在啟動後不會注意到程式碼庫中的變更。因此,在您的部署過程中,請務必重新啟動您的佇列 worker。此外,請記住,您的應用程式建立或修改的任何靜態狀態都不會在任務之間自動重置。
或者,您可以運行 queue:listen
命令。當使用 queue:listen
命令時,當您想要重新載入更新後的程式碼或重置應用程式狀態時,您不必手動重新啟動 worker;但是,此命令的效率遠低於 queue:work
命令
1php artisan queue:listen
運行多個佇列 Worker
要為佇列分配多個 worker 並同時處理任務,您應該只需啟動多個 queue:work
程序。這可以在本地透過終端機中的多個標籤完成,也可以在生產環境中使用程序管理器的配置設定完成。當使用 Supervisor 時,您可以使用 numprocs
配置值。
指定連線和佇列
您也可以指定 worker 應使用的佇列連線。傳遞給 work
命令的連線名稱應對應於您的 config/queue.php
設定檔中定義的連線之一
1php artisan queue:work redis
預設情況下,queue:work
命令僅處理給定連線上預設佇列的任務。但是,您可以透過僅處理給定連線的特定佇列來進一步自訂您的佇列 worker。例如,如果您的所有電子郵件都在 redis
佇列連線上的 emails
佇列中處理,您可以發出以下命令來啟動僅處理該佇列的 worker
1php artisan queue:work redis --queue=emails
處理指定數量的任務
可以使用 --once
選項來指示 worker 僅處理佇列中的單個任務
1php artisan queue:work --once
可以使用 --max-jobs
選項來指示 worker 處理給定數量的任務,然後退出。當與Supervisor結合使用時,此選項可能很有用,以便您的 worker 在處理給定數量的任務後自動重新啟動,釋放它們可能累積的任何記憶體
1php artisan queue:work --max-jobs=1000
處理所有排隊任務然後退出
可以使用 --stop-when-empty
選項來指示 worker 處理所有任務,然後優雅地退出。如果您希望在 Docker 容器中處理 Laravel 佇列,並且希望在佇列為空後關閉容器,則此選項可能很有用
1php artisan queue:work --stop-when-empty
處理給定秒數的任務
可以使用 --max-time
選項來指示 worker 處理給定秒數的任務,然後退出。當與Supervisor結合使用時,此選項可能很有用,以便您的 worker 在處理給定時間量的任務後自動重新啟動,釋放它們可能累積的任何記憶體
1# Process jobs for one hour and then exit...2php artisan queue:work --max-time=3600
Worker 休眠持續時間
當佇列上有可用任務時,worker 將繼續處理任務,任務之間沒有延遲。但是,sleep
選項決定了在沒有可用任務時 worker 將「休眠」多少秒。當然,在休眠時,worker 將不會處理任何新任務
1php artisan queue:work --sleep=3
維護模式和佇列
當您的應用程式處於維護模式時,將不會處理任何排隊任務。一旦應用程式退出維護模式,任務將繼續正常處理。
要強制您的佇列 worker 即使在啟用維護模式的情況下也處理任務,您可以使用 --force
選項
1php artisan queue:work --force
資源考量
Daemon 佇列 worker 不會在處理每個任務之前「重新啟動」框架。因此,您應該在每個任務完成後釋放任何繁重的資源。例如,如果您正在使用 GD 庫進行圖像處理,則應在使用完圖像後使用 imagedestroy
釋放記憶體。
佇列優先順序
有時您可能希望優先處理佇列的方式。例如,在您的 config/queue.php
設定檔中,您可以將 redis
連線的預設 queue
設定為 low
。但是,有時您可能希望將任務推送到 high
優先順序佇列,如下所示
1dispatch((new Job)->onQueue('high'));
要啟動一個 worker,該 worker 驗證所有 high
佇列任務都已處理完畢,然後再繼續處理 low
佇列上的任何任務,請將以逗號分隔的佇列名稱列表傳遞給 work
命令
1php artisan queue:work --queue=high,low
佇列工作程序和部署
由於佇列 worker 是長時間運行的程序,因此它們在重新啟動之前不會注意到您的程式碼變更。因此,使用佇列 worker 部署應用程式的最簡單方法是在部署過程中重新啟動 worker。您可以透過發出 queue:restart
命令優雅地重新啟動所有 worker
1php artisan queue:restart
此命令將指示所有佇列 worker 在完成處理目前任務後優雅地退出,以便不會遺失任何現有任務。由於佇列 worker 將在執行 queue:restart
命令時退出,因此您應該運行程序管理器(例如Supervisor)以自動重新啟動佇列 worker。
佇列使用快取來儲存重新啟動信號,因此您應該在使用此功能之前驗證是否為您的應用程式正確配置了快取驅動程式。
任務過期和超時
任務過期
在您的 config/queue.php
設定檔中,每個佇列連線都定義了一個 retry_after
選項。此選項指定佇列連線在重試正在處理的任務之前應等待多少秒。例如,如果 retry_after
的值設定為 90
,則如果任務已處理 90 秒而未被釋放或刪除,則該任務將被釋放回佇列。通常,您應該將 retry_after
值設定為您的任務合理完成處理所需的最大秒數。
唯一不包含 retry_after
值的佇列連線是 Amazon SQS。SQS 將根據 預設可見性逾時重試任務,該逾時在 AWS 控制台中管理。
Worker 逾時
queue:work
Artisan 命令公開了 --timeout
選項。預設情況下,--timeout
值為 60 秒。如果任務處理時間超過逾時值指定的秒數,則處理該任務的 worker 將會錯誤退出。通常,worker 會由您伺服器上設定的程序管理器自動重新啟動
1php artisan queue:work --timeout=60
retry_after
配置選項和 --timeout
CLI 選項是不同的,但它們協同工作以確保任務不會遺失,並且任務僅成功處理一次。
--timeout
值應始終比您的 retry_after
配置值至少短幾秒鐘。這將確保在重試任務之前始終終止處理凍結任務的 worker。如果您的 --timeout
選項長於您的 retry_after
配置值,您的任務可能會被處理兩次。
Supervisor 設定
在生產環境中,您需要一種方法來保持您的 queue:work
程序運行。queue:work
程序可能會因各種原因停止運行,例如超出 worker 逾時或執行 queue:restart
命令。
因此,您需要配置一個程序監視器,它可以偵測到您的 queue:work
程序何時退出並自動重新啟動它們。此外,程序監視器還可以讓您指定您想要同時運行的 queue:work
程序數量。Supervisor 是一種程序監視器,通常在 Linux 環境中使用,我們將在以下文件中討論如何配置它。
安裝 Supervisor
Supervisor 是 Linux 作業系統的程序監視器,如果您的 queue:work
程序失敗,它將自動重新啟動它們。要在 Ubuntu 上安裝 Supervisor,您可以使用以下命令
1sudo apt-get install supervisor
如果自行配置和管理 Supervisor 聽起來令人感到不知所措,請考慮使用 Laravel Cloud,它提供了一個完全託管的平台來運行 Laravel 佇列 worker。
配置 Supervisor
Supervisor 配置檔案通常儲存在 /etc/supervisor/conf.d
目錄中。在此目錄中,您可以建立任意數量的設定檔,這些設定檔指示 supervisor 應如何監控您的程序。例如,讓我們建立一個 laravel-worker.conf
檔案,該檔案啟動並監控 queue:work
程序
1[program:laravel-worker] 2process_name=%(program_name)s_%(process_num)02d 3command=php /home/forge/app.com/artisan queue:work sqs --sleep=3 --tries=3 --max-time=3600 4autostart=true 5autorestart=true 6stopasgroup=true 7killasgroup=true 8user=forge 9numprocs=810redirect_stderr=true11stdout_logfile=/home/forge/app.com/worker.log12stopwaitsecs=3600
在此範例中,numprocs
指令將指示 Supervisor 運行八個 queue:work
程序並監控所有程序,如果它們失敗則自動重新啟動它們。您應該變更配置的 command
指令,以反映您想要的佇列連線和 worker 選項。
您應確保 stopwaitsecs
的值大於您運行時間最長的任務所消耗的秒數。否則,Supervisor 可能會在任務完成處理之前終止該任務。
啟動 Supervisor
建立設定檔後,您可以更新 Supervisor 配置並使用以下命令啟動程序
1sudo supervisorctl reread2 3sudo supervisorctl update4 5sudo supervisorctl start "laravel-worker:*"
有關 Supervisor 的更多資訊,請參閱Supervisor 文件。
處理失敗的任務
有時候您佇列中的任務會失敗。別擔心,事情不總是如預期般進行!Laravel 提供一個方便的方式來指定任務應該嘗試的最大次數。在非同步任務超過嘗試次數上限後,它將會被插入到 failed_jobs
資料庫表格中。同步調度的任務若失敗,則不會儲存在此表格中,且它們的例外會立即由應用程式處理。
在新的 Laravel 應用程式中,通常已存在建立 failed_jobs
表格的 migration 檔案。然而,如果您的應用程式沒有此表格的 migration 檔案,您可以使用 make:queue-failed-table
命令來建立 migration 檔案。
1php artisan make:queue-failed-table2 3php artisan migrate
當執行佇列 worker 程序時,您可以使用 queue:work
命令的 --tries
參數來指定任務應該嘗試的最大次數。如果您沒有為 --tries
選項指定值,任務將只會嘗試一次,或者依照任務類別的 $tries
屬性所指定的次數嘗試。
1php artisan queue:work redis --tries=3
使用 --backoff
選項,您可以指定 Laravel 在重試遇到例外狀況的任務之前應該等待多少秒。預設情況下,任務會立即被放回佇列中,以便再次嘗試。
1php artisan queue:work redis --tries=3 --backoff=3
如果您想針對每個任務設定 Laravel 在重試遇到例外狀況的任務之前應該等待多少秒,您可以透過在您的任務類別中定義 backoff
屬性來達成。
1/**2 * The number of seconds to wait before retrying the job.3 *4 * @var int5 */6public $backoff = 3;
如果您需要更複雜的邏輯來決定任務的 backoff 時間,您可以在您的任務類別中定義 backoff
方法。
1/**2 * Calculate the number of seconds to wait before retrying the job.3 */4public function backoff(): int5{6 return 3;7}
您可以透過從 backoff
方法中回傳一個 backoff 值的陣列,輕鬆設定「指數式」backoff。在這個範例中,重試延遲時間將會是第一次重試 1 秒、第二次重試 5 秒、第三次重試 10 秒,以及之後每次重試(如果還有剩餘嘗試次數)都是 10 秒。
1/**2 * Calculate the number of seconds to wait before retrying the job.3 *4 * @return array<int, int>5 */6public function backoff(): array7{8 return [1, 5, 10];9}
清理失敗任務後的殘留
當特定任務失敗時,您可能想要發送警報給您的使用者,或是還原任務部分完成的任何動作。為了達成這個目的,您可以在您的任務類別中定義一個 failed
方法。造成任務失敗的 Throwable
實例將會被傳遞給 failed
方法。
1<?php 2 3namespace App\Jobs; 4 5use App\Models\Podcast; 6use App\Services\AudioProcessor; 7use Illuminate\Contracts\Queue\ShouldQueue; 8use Illuminate\Foundation\Queue\Queueable; 9use Throwable;10 11class ProcessPodcast implements ShouldQueue12{13 use Queueable;14 15 /**16 * Create a new job instance.17 */18 public function __construct(19 public Podcast $podcast,20 ) {}21 22 /**23 * Execute the job.24 */25 public function handle(AudioProcessor $processor): void26 {27 // Process uploaded podcast...28 }29 30 /**31 * Handle a job failure.32 */33 public function failed(?Throwable $exception): void34 {35 // Send user notification of failure, etc...36 }37}
在調用 failed
方法之前,會先實例化一個新的任務實例;因此,在 handle
方法中可能發生的任何類別屬性修改都將會遺失。
重試失敗的任務
若要檢視所有已插入到您的 failed_jobs
資料庫表格中的失敗任務,您可以使用 queue:failed
Artisan 命令。
1php artisan queue:failed
queue:failed
命令將會列出任務 ID、連線、佇列、失敗時間,以及關於任務的其他資訊。任務 ID 可以用來重試失敗的任務。例如,要重試 ID 為 ce7bb17c-cdd8-41f0-a8ec-7b4fef4e5ece
的失敗任務,請執行以下命令。
1php artisan queue:retry ce7bb17c-cdd8-41f0-a8ec-7b4fef4e5ece
如果有需要,您可以將多個 ID 傳遞給此命令。
1php artisan queue:retry ce7bb17c-cdd8-41f0-a8ec-7b4fef4e5ece 91401d2c-0784-4f43-824c-34f94a33c24d
您也可以重試特定佇列的所有失敗任務。
1php artisan queue:retry --queue=name
若要重試您所有的失敗任務,請執行 queue:retry
命令並傳遞 all
作為 ID。
1php artisan queue:retry all
如果您想要刪除一個失敗的任務,您可以使用 queue:forget
命令。
1php artisan queue:forget 91401d2c-0784-4f43-824c-34f94a33c24d
當使用 Horizon 時,您應該使用 horizon:forget
命令來刪除失敗的任務,而不是 queue:forget
命令。
若要從 failed_jobs
表格中刪除您所有的失敗任務,您可以使用 queue:flush
命令。
1php artisan queue:flush
忽略遺失的模型
當將 Eloquent 模型注入到任務中時,該模型會在放入佇列之前自動序列化,並在任務處理時從資料庫重新取回。然而,如果模型在任務等待 worker 處理期間被刪除,您的任務可能會因 ModelNotFoundException
而失敗。
為了方便起見,您可以選擇將任務的 deleteWhenMissingModels
屬性設定為 true
,來自動刪除模型遺失的任務。當此屬性設定為 true
時,Laravel 將會靜默地捨棄該任務,而不會引發例外。
1/**2 * Delete the job if its models no longer exist.3 *4 * @var bool5 */6public $deleteWhenMissingModels = true;
修剪失敗的任務
您可以透過調用 queue:prune-failed
Artisan 命令,來修剪您的應用程式 failed_jobs
表格中的記錄。
1php artisan queue:prune-failed
預設情況下,所有超過 24 小時的失敗任務記錄都會被修剪。如果您提供 --hours
選項給命令,則只會保留最近 N 小時內插入的失敗任務記錄。例如,以下命令將會刪除所有在 48 小時前插入的失敗任務記錄。
1php artisan queue:prune-failed --hours=48
在 DynamoDB 中儲存失敗的任務
Laravel 也支援將您的失敗任務記錄儲存在 DynamoDB 中,而不是關聯式資料庫表格。然而,您必須手動建立一個 DynamoDB 表格來儲存所有失敗的任務記錄。通常,這個表格應該命名為 failed_jobs
,但您應該根據應用程式 queue
設定檔中 queue.failed.table
設定值來命名表格。
failed_jobs
表格應該有一個名為 application
的字串主分割鍵和一個名為 uuid
的字串主排序鍵。鍵的 application
部分將包含您的應用程式名稱,該名稱由您的應用程式 app
設定檔中的 name
設定值所定義。由於應用程式名稱是 DynamoDB 表格鍵的一部分,您可以使用同一個表格來儲存多個 Laravel 應用程式的失敗任務。
此外,請確保您已安裝 AWS SDK,以便您的 Laravel 應用程式可以與 Amazon DynamoDB 通訊。
1composer require aws/aws-sdk-php
接下來,將 queue.failed.driver
設定選項的值設定為 dynamodb
。此外,您應該在失敗任務設定陣列中定義 key
、secret
和 region
設定選項。這些選項將用於向 AWS 驗證身分。當使用 dynamodb
driver 時,queue.failed.database
設定選項是不必要的。
1'failed' => [2 'driver' => env('QUEUE_FAILED_DRIVER', 'dynamodb'),3 'key' => env('AWS_ACCESS_KEY_ID'),4 'secret' => env('AWS_SECRET_ACCESS_KEY'),5 'region' => env('AWS_DEFAULT_REGION', 'us-east-1'),6 'table' => 'failed_jobs',7],
停用失敗任務儲存
您可以透過將 queue.failed.driver
設定選項的值設定為 null
,指示 Laravel 捨棄失敗的任務而不儲存它們。通常,這可以透過 QUEUE_FAILED_DRIVER
環境變數來完成。
1QUEUE_FAILED_DRIVER=null
失敗任務事件
如果您想要註冊一個事件監聽器,在任務失敗時被調用,您可以使用 Queue
facade 的 failing
方法。例如,我們可以從 Laravel 包含的 AppServiceProvider
的 boot
方法中,將一個 closure 附加到這個事件。
1<?php 2 3namespace App\Providers; 4 5use Illuminate\Support\Facades\Queue; 6use Illuminate\Support\ServiceProvider; 7use Illuminate\Queue\Events\JobFailed; 8 9class AppServiceProvider extends ServiceProvider10{11 /**12 * Register any application services.13 */14 public function register(): void15 {16 // ...17 }18 19 /**20 * Bootstrap any application services.21 */22 public function boot(): void23 {24 Queue::failing(function (JobFailed $event) {25 // $event->connectionName26 // $event->job27 // $event->exception28 });29 }30}
從佇列中清除任務
當使用 Horizon 時,您應該使用 horizon:clear
命令從佇列中清除任務,而不是 queue:clear
命令。
如果您想要從預設連線的預設佇列中刪除所有任務,您可以使用 queue:clear
Artisan 命令來完成。
1php artisan queue:clear
您也可以提供 connection
參數和 queue
選項,從特定的連線和佇列中刪除任務。
1php artisan queue:clear redis --queue=emails
從佇列中清除任務僅適用於 SQS、Redis 和 database 佇列 driver。此外,SQS 訊息刪除過程最多需要 60 秒,因此在您清除佇列後 60 秒內發送到 SQS 佇列的任務也可能被刪除。
監控您的佇列
如果您的佇列收到大量湧入的任務,它可能會變得不堪負荷,導致任務完成的等待時間過長。如果您希望,當您的佇列任務計數超過指定閾值時,Laravel 可以發出警報通知您。
要開始使用,您應該排程 queue:monitor
命令每分鐘執行一次。此命令接受您想要監控的佇列名稱以及您期望的任務計數閾值。
1php artisan queue:monitor redis:default,redis:deployments --max=100
單獨排程此命令不足以觸發通知來警告您佇列的過載狀態。當命令遇到任務計數超過您閾值的佇列時,將會發送 Illuminate\Queue\Events\QueueBusy
事件。您可以在應用程式的 AppServiceProvider
中監聽此事件,以便向您或您的開發團隊發送通知。
1use App\Notifications\QueueHasLongWaitTime; 2use Illuminate\Queue\Events\QueueBusy; 3use Illuminate\Support\Facades\Event; 4use Illuminate\Support\Facades\Notification; 5 6/** 7 * Bootstrap any application services. 8 */ 9public function boot(): void10{11 Event::listen(function (QueueBusy $event) {13 ->notify(new QueueHasLongWaitTime(14 $event->connection,15 $event->queue,16 $event->size17 ));18 });19}
測試
當測試調度任務的程式碼時,您可能希望指示 Laravel 不要實際執行任務本身,因為任務的程式碼可以與調度它的程式碼分開直接測試。當然,要測試任務本身,您可以實例化一個任務實例,並在您的測試中直接調用 handle
方法。
您可以使用 Queue
facade 的 fake
方法來防止佇列任務實際被推送到佇列。在調用 Queue
facade 的 fake
方法之後,您可以接著斷言應用程式嘗試將任務推送到佇列。
1<?php 2 3use App\Jobs\AnotherJob; 4use App\Jobs\FinalJob; 5use App\Jobs\ShipOrder; 6use Illuminate\Support\Facades\Queue; 7 8test('orders can be shipped', function () { 9 Queue::fake();10 11 // Perform order shipping...12 13 // Assert that no jobs were pushed...14 Queue::assertNothingPushed();15 16 // Assert a job was pushed to a given queue...17 Queue::assertPushedOn('queue-name', ShipOrder::class);18 19 // Assert a job was pushed twice...20 Queue::assertPushed(ShipOrder::class, 2);21 22 // Assert a job was not pushed...23 Queue::assertNotPushed(AnotherJob::class);24 25 // Assert that a Closure was pushed to the queue...26 Queue::assertClosurePushed();27 28 // Assert the total number of jobs that were pushed...29 Queue::assertCount(3);30});
1<?php 2 3namespace Tests\Feature; 4 5use App\Jobs\AnotherJob; 6use App\Jobs\FinalJob; 7use App\Jobs\ShipOrder; 8use Illuminate\Support\Facades\Queue; 9use Tests\TestCase;10 11class ExampleTest extends TestCase12{13 public function test_orders_can_be_shipped(): void14 {15 Queue::fake();16 17 // Perform order shipping...18 19 // Assert that no jobs were pushed...20 Queue::assertNothingPushed();21 22 // Assert a job was pushed to a given queue...23 Queue::assertPushedOn('queue-name', ShipOrder::class);24 25 // Assert a job was pushed twice...26 Queue::assertPushed(ShipOrder::class, 2);27 28 // Assert a job was not pushed...29 Queue::assertNotPushed(AnotherJob::class);30 31 // Assert that a Closure was pushed to the queue...32 Queue::assertClosurePushed();33 34 // Assert the total number of jobs that were pushed...35 Queue::assertCount(3);36 }37}
您可以將一個 closure 傳遞給 assertPushed
或 assertNotPushed
方法,以便斷言是否有任務被推送且通過給定的「真值測試」。如果至少有一個任務被推送且通過給定的真值測試,則斷言將會成功。
1Queue::assertPushed(function (ShipOrder $job) use ($order) {2 return $job->order->id === $order->id;3});
偽造任務子集
如果您只需要 fake 特定任務,同時允許其他任務正常執行,您可以將應該被 fake 的任務類別名稱傳遞給 fake
方法。
1test('orders can be shipped', function () { 2 Queue::fake([ 3 ShipOrder::class, 4 ]); 5 6 // Perform order shipping... 7 8 // Assert a job was pushed twice... 9 Queue::assertPushed(ShipOrder::class, 2);10});
1public function test_orders_can_be_shipped(): void 2{ 3 Queue::fake([ 4 ShipOrder::class, 5 ]); 6 7 // Perform order shipping... 8 9 // Assert a job was pushed twice...10 Queue::assertPushed(ShipOrder::class, 2);11}
您可以使用 except
方法 fake 除了指定任務集合以外的所有任務。
1Queue::fake()->except([2 ShipOrder::class,3]);
測試任務鏈
若要測試任務鏈,您將需要利用 Bus
facade 的 faking 功能。Bus
facade 的 assertChained
方法可用於斷言是否已調度任務鏈。assertChained
方法接受一個鏈式任務陣列作為它的第一個參數。
1use App\Jobs\RecordShipment; 2use App\Jobs\ShipOrder; 3use App\Jobs\UpdateInventory; 4use Illuminate\Support\Facades\Bus; 5 6Bus::fake(); 7 8// ... 9 10Bus::assertChained([11 ShipOrder::class,12 RecordShipment::class,13 UpdateInventory::class14]);
如您在上面的範例中所見,鏈式任務的陣列可以是任務類別名稱的陣列。然而,您也可以提供實際任務實例的陣列。當這樣做時,Laravel 將確保任務實例與您的應用程式調度的鏈式任務屬於相同的類別,並且具有相同的屬性值。
1Bus::assertChained([2 new ShipOrder,3 new RecordShipment,4 new UpdateInventory,5]);
您可以使用 assertDispatchedWithoutChain
方法來斷言是否有任務在沒有任務鏈的情況下被推送。
1Bus::assertDispatchedWithoutChain(ShipOrder::class);
測試鏈式修改
如果鏈式任務在現有鏈中前置或附加任務,您可以使用任務的 assertHasChain
方法來斷言該任務是否具有預期的剩餘任務鏈。
1$job = new ProcessPodcast;2 3$job->handle();4 5$job->assertHasChain([6 new TranscribePodcast,7 new OptimizePodcast,8 new ReleasePodcast,9]);
assertDoesntHaveChain
方法可以用於斷言任務的剩餘鏈是否為空。
1$job->assertDoesntHaveChain();
測試鏈式批次
如果您的任務鏈包含一批任務,您可以透過在您的鏈式斷言中插入 Bus::chainedBatch
定義,來斷言鏈式批次是否符合您的期望。
1use App\Jobs\ShipOrder; 2use App\Jobs\UpdateInventory; 3use Illuminate\Bus\PendingBatch; 4use Illuminate\Support\Facades\Bus; 5 6Bus::assertChained([ 7 new ShipOrder, 8 Bus::chainedBatch(function (PendingBatch $batch) { 9 return $batch->jobs->count() === 3;10 }),11 new UpdateInventory,12]);
測試任務批次
Bus
facade 的 assertBatched
方法可用於斷言是否已調度一批任務。傳遞給 assertBatched
方法的 closure 會接收一個 Illuminate\Bus\PendingBatch
的實例,可用於檢查批次中的任務。
1use Illuminate\Bus\PendingBatch; 2use Illuminate\Support\Facades\Bus; 3 4Bus::fake(); 5 6// ... 7 8Bus::assertBatched(function (PendingBatch $batch) { 9 return $batch->name == 'import-csv' &&10 $batch->jobs->count() === 10;11});
您可以使用 assertBatchCount
方法來斷言是否已調度給定數量的批次。
1Bus::assertBatchCount(3);
您可以使用 assertNothingBatched
來斷言是否沒有調度任何批次。
1Bus::assertNothingBatched();
測試任務 / 批次互動
此外,您可能偶爾需要測試個別任務與其底層批次的互動。例如,您可能需要測試任務是否取消了其批次的進一步處理。為了完成此操作,您需要透過 withFakeBatch
方法將 fake 批次指派給任務。withFakeBatch
方法會回傳一個包含任務實例和 fake 批次的 tuple。
1[$job, $batch] = (new ShipOrder)->withFakeBatch();2 3$job->handle();4 5$this->assertTrue($batch->cancelled());6$this->assertEmpty($batch->added);
測試任務 / 佇列互動
有時候,您可能需要測試佇列任務是否將自己釋放回佇列。或者,您可能需要測試任務是否刪除了自己。您可以透過實例化任務並調用 withFakeQueueInteractions
方法來測試這些佇列互動。
一旦任務的佇列互動被 fake,您可以調用任務的 handle
方法。在調用任務之後,可以使用 assertReleased
、assertDeleted
、assertNotDeleted
、assertFailed
、assertFailedWith
和 assertNotFailed
方法來針對任務的佇列互動進行斷言。
1use App\Exceptions\CorruptedAudioException; 2use App\Jobs\ProcessPodcast; 3 4$job = (new ProcessPodcast)->withFakeQueueInteractions(); 5 6$job->handle(); 7 8$job->assertReleased(delay: 30); 9$job->assertDeleted();10$job->assertNotDeleted();11$job->assertFailed();12$job->assertFailedWith(CorruptedAudioException::class);13$job->assertNotFailed();
任務事件
使用 Queue
facade 上的 before
和 after
方法,您可以指定在佇列任務處理之前或之後執行的回呼函數。這些回呼函數是執行額外日誌記錄或增加儀表板統計資訊的絕佳機會。通常,您應該從 服務提供者的 boot
方法中調用這些方法。例如,我們可以使用 Laravel 包含的 AppServiceProvider
。
1<?php 2 3namespace App\Providers; 4 5use Illuminate\Support\Facades\Queue; 6use Illuminate\Support\ServiceProvider; 7use Illuminate\Queue\Events\JobProcessed; 8use Illuminate\Queue\Events\JobProcessing; 9 10class AppServiceProvider extends ServiceProvider11{12 /**13 * Register any application services.14 */15 public function register(): void16 {17 // ...18 }19 20 /**21 * Bootstrap any application services.22 */23 public function boot(): void24 {25 Queue::before(function (JobProcessing $event) {26 // $event->connectionName27 // $event->job28 // $event->job->payload()29 });30 31 Queue::after(function (JobProcessed $event) {32 // $event->connectionName33 // $event->job34 // $event->job->payload()35 });36 }37}
使用 Queue
facade 上的 looping
方法,您可以指定在 worker 嘗試從佇列中獲取任務之前執行的回呼函數。例如,您可能會註冊一個 closure 來回滾先前失敗任務所遺留的任何未關閉的交易。
1use Illuminate\Support\Facades\DB;2use Illuminate\Support\Facades\Queue;3 4Queue::looping(function () {5 while (DB::transactionLevel() > 0) {6 DB::rollBack();7 }8});