佇列
簡介
在建立您的網頁應用程式時,您可能會有一些任務,例如剖析和儲存上傳的 CSV 檔案,這些任務在典型的網頁請求期間執行時間太長。幸運的是,Laravel 讓您可以輕鬆建立可在背景處理的佇列任務。透過將耗時的任務移至佇列,您的應用程式可以閃電般的速度回應網頁請求,並為您的客戶提供更好的使用者體驗。
Laravel 佇列在各種不同的佇列後端(例如 Amazon SQS、Redis 甚至關係資料庫)提供統一的佇列 API。
Laravel 的佇列設定選項儲存在您應用程式的 config/queue.php
設定檔中。在此檔案中,您會找到框架隨附的每個佇列驅動程式的連線設定,包括資料庫、Amazon SQS、Redis 和 Beanstalkd 驅動程式,以及將立即執行任務的同步驅動程式(用於本機開發期間)。還包含一個 null
佇列驅動程式,它會捨棄佇列任務。
Laravel 現在提供 Horizon,一個美觀的儀表板和針對您的 Redis 支援佇列的設定系統。查看完整的 Horizon 文件以取得更多資訊。
連線與佇列
在開始使用 Laravel 佇列之前,務必了解「連線」和「佇列」之間的區別。在您的 config/queue.php
設定檔中,有一個 connections
設定陣列。此選項定義與後端佇列服務(例如 Amazon SQS、Beanstalk 或 Redis)的連線。但是,任何給定的佇列連線都可能有多個「佇列」,這些佇列可以視為不同的堆疊或佇列任務堆。
請注意,queue
設定檔中的每個連線設定範例都包含一個 queue
屬性。這是當任務傳送到給定連線時,將派遣任務的預設佇列。換句話說,如果您派遣任務時未明確定義應該派遣到哪個佇列,則任務將放置在連線設定的 queue
屬性中定義的佇列上
use App\Jobs\ProcessPodcast; // This job is sent to the default connection's default queue...ProcessPodcast::dispatch(); // This job is sent to the default connection's "emails" queue...ProcessPodcast::dispatch()->onQueue('emails');
某些應用程式可能不需要將任務推送到多個佇列,而是傾向於只有一個簡單的佇列。但是,將任務推送到多個佇列對於希望優先處理或區隔任務處理方式的應用程式特別有用,因為 Laravel 佇列工作程式允許您指定它應該按優先順序處理哪些佇列。例如,如果您將任務推送到 high
佇列,您可以執行一個給予它們較高處理優先順序的工作程式
php artisan queue:work --queue=high,default
驅動程式注意事項和先決條件
資料庫
為了使用 database
佇列驅動程式,您需要一個資料庫表格來保存任務。通常,這包含在 Laravel 的預設 0001_01_01_000002_create_jobs_table.php
資料庫遷移中;但是,如果您的應用程式不包含此遷移,您可以使用 make:queue-table
Artisan 命令來建立它
php artisan make:queue-table php artisan migrate
Redis
為了使用 redis
佇列驅動程式,您應該在 config/database.php
設定檔中設定 Redis 資料庫連線。
redis
佇列驅動程式不支援 serializer
和 compression
Redis 選項。
Redis 叢集
如果您的 Redis 佇列連線使用 Redis 叢集,您的佇列名稱必須包含 金鑰雜湊標籤。這是為了確保給定佇列的所有 Redis 金鑰都放置在相同的雜湊槽中
'redis' => [ 'driver' => 'redis', 'connection' => env('REDIS_QUEUE_CONNECTION', 'default'), 'queue' => env('REDIS_QUEUE', '{default}'), 'retry_after' => env('REDIS_QUEUE_RETRY_AFTER', 90), 'block_for' => null, 'after_commit' => false,],
封鎖
當使用 Redis 佇列時,您可以使用 block_for
設定選項來指定驅動程式在反覆執行工作程式迴圈並重新輪詢 Redis 資料庫之前,應該等待任務可用的時間長度。
根據您的佇列負載調整此值,可能比持續輪詢 Redis 資料庫以取得新任務更有效率。例如,您可以將該值設定為 5
,表示驅動程式在等待任務可用時應封鎖五秒鐘
'redis' => [ 'driver' => 'redis', 'connection' => env('REDIS_QUEUE_CONNECTION', 'default'), 'queue' => env('REDIS_QUEUE', 'default'), 'retry_after' => env('REDIS_QUEUE_RETRY_AFTER', 90), 'block_for' => 5, 'after_commit' => false,],
將 block_for
設定為 0
將導致佇列工作程式無限期地封鎖,直到有任務可用為止。這也會阻止在處理下一個任務之前處理諸如 SIGTERM
之類的訊號。
其他驅動程式先決條件
下列佇列驅動程式需要下列相依性。這些相依性可以透過 Composer 套件管理器安裝
- Amazon SQS:
aws/aws-sdk-php ~3.0
- Beanstalkd:
pda/pheanstalk ~5.0
- Redis:
predis/predis ~2.0
或 phpredis PHP 擴充功能 -
MongoDB:
mongodb/laravel-mongodb
建立任務
產生任務類別
根據預設,您應用程式的所有可佇列任務都儲存在 app/Jobs
目錄中。如果 app/Jobs
目錄不存在,當您執行 make:job
Artisan 命令時,將會建立該目錄
php artisan make:job ProcessPodcast
產生的類別將實作 Illuminate\Contracts\Queue\ShouldQueue
介面,向 Laravel 指示該任務應推送到佇列以非同步執行。
可以使用 樁發佈來自訂任務樁。
類別結構
任務類別非常簡單,通常只包含一個 handle
方法,該方法會在任務被佇列處理時被調用。首先,讓我們先看一個任務類別的範例。在這個範例中,我們假設管理一個 Podcast 發布服務,需要在發布前處理上傳的 Podcast 檔案。
<?php namespace App\Jobs; use App\Models\Podcast;use App\Services\AudioProcessor;use Illuminate\Contracts\Queue\ShouldQueue;use Illuminate\Foundation\Queue\Queueable; class ProcessPodcast implements ShouldQueue{ use Queueable; /** * Create a new job instance. */ public function __construct( public Podcast $podcast, ) {} /** * Execute the job. */ public function handle(AudioProcessor $processor): void { // Process uploaded podcast... }}
在這個範例中,請注意我們能夠直接將 Eloquent 模型 傳遞到佇列任務的建構子中。由於任務使用了 Queueable
trait,當任務處理時,Eloquent 模型及其載入的關聯會被優雅地序列化和反序列化。
如果您的佇列任務在其建構子中接收一個 Eloquent 模型,則只有該模型的識別碼會被序列化到佇列中。當任務實際處理時,佇列系統會自動從資料庫重新檢索完整的模型實例及其載入的關聯。這種模型序列化的方法允許將更小的任務負載發送到您的佇列驅動程式。
handle
方法的依賴注入
當任務被佇列處理時,會調用 handle
方法。請注意,我們可以在任務的 handle
方法上使用類型提示來注入依賴項。Laravel 的 服務容器 會自動注入這些依賴項。
如果您想要完全控制容器如何將依賴項注入到 handle
方法中,您可以使用容器的 bindMethod
方法。bindMethod
方法接收一個回呼函數,該回呼函數接收任務和容器。在回呼函數中,您可以隨意調用 handle
方法。通常,您應該從 App\Providers\AppServiceProvider
的 服務提供者 的 boot
方法中呼叫此方法。
use App\Jobs\ProcessPodcast;use App\Services\AudioProcessor;use Illuminate\Contracts\Foundation\Application; $this->app->bindMethod([ProcessPodcast::class, 'handle'], function (ProcessPodcast $job, Application $app) { return $job->handle($app->make(AudioProcessor::class));});
二進制數據,例如原始圖像內容,應在傳遞到佇列任務之前通過 base64_encode
函數進行處理。否則,任務在被放入佇列時可能無法正確序列化為 JSON。
佇列關係
由於所有已載入的 Eloquent 模型關係也會在任務被加入佇列時進行序列化,因此序列化的任務字串有時可能會變得相當大。此外,當任務被反序列化且模型關係從資料庫重新檢索時,它們將被完整地檢索。在任務加入佇列過程中模型被序列化之前應用的任何先前關係約束,在任務反序列化時將不會被應用。因此,如果您希望使用給定關係的子集,您應該在佇列任務中重新約束該關係。
或者,為了防止關係被序列化,您可以在設定屬性值時在模型上呼叫 withoutRelations
方法。此方法將返回一個不包含其已載入關係的模型實例。
/** * Create a new job instance. */public function __construct( Podcast $podcast,) { $this->podcast = $podcast->withoutRelations();}
如果您使用 PHP 建構子屬性提升並想要指示 Eloquent 模型不應序列化其關係,您可以使用 WithoutRelations
屬性。
use Illuminate\Queue\Attributes\WithoutRelations; /** * Create a new job instance. */public function __construct( #[WithoutRelations] public Podcast $podcast,) {}
如果任務接收到一個 Eloquent 模型集合或陣列而不是單個模型,則該集合中的模型在任務反序列化和執行時將不會恢復其關係。這是為了防止處理大量模型的任務過度使用資源。
唯一任務
唯一任務需要一個支援鎖定的快取驅動程式。目前,memcached
、redis
、dynamodb
、database
、file
和 array
快取驅動程式支援原子鎖定。此外,唯一任務約束不適用於批次中的任務。
有時,您可能希望確保在任何時間點佇列中只有一個特定任務的實例。您可以在您的任務類別中實作 ShouldBeUnique
介面來實現此目的。此介面不需要您在類別上定義任何其他方法。
<?php use Illuminate\Contracts\Queue\ShouldQueue;use Illuminate\Contracts\Queue\ShouldBeUnique; class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique{ ...}
在上面的範例中,UpdateSearchIndex
任務是唯一的。因此,如果另一個任務實例已經在佇列中且尚未完成處理,則不會發送該任務。
在某些情況下,您可能想要定義一個使任務唯一的特定「鍵」,或者您可能想要指定一個超時時間,超過該時間後任務將不再保持唯一性。為了實現這一點,您可以在您的任務類別中定義 uniqueId
和 uniqueFor
屬性或方法。
<?php use App\Models\Product;use Illuminate\Contracts\Queue\ShouldQueue;use Illuminate\Contracts\Queue\ShouldBeUnique; class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique{ /** * The product instance. * * @var \App\Product */ public $product; /** * The number of seconds after which the job's unique lock will be released. * * @var int */ public $uniqueFor = 3600; /** * Get the unique ID for the job. */ public function uniqueId(): string { return $this->product->id; }}
在上面的範例中,UpdateSearchIndex
任務按產品 ID 是唯一的。因此,任何使用相同產品 ID 的新任務發送都將被忽略,直到現有的任務完成處理。此外,如果現有任務在一小時內沒有處理完畢,則唯一鎖定將被釋放,並且可以將另一個具有相同唯一鍵的任務發送到佇列。
如果您的應用程式從多個 Web 伺服器或容器發送任務,您應該確保所有伺服器都與同一個中央快取伺服器通信,以便 Laravel 可以準確判斷任務是否唯一。
在處理開始之前保持任務唯一性
預設情況下,唯一任務會在任務完成處理或所有重試嘗試失敗後「解鎖」。但是,在某些情況下,您可能希望您的任務在處理之前立即解鎖。為了實現這一點,您的任務應實作 ShouldBeUniqueUntilProcessing
合約,而不是 ShouldBeUnique
合約。
<?php use App\Models\Product;use Illuminate\Contracts\Queue\ShouldQueue;use Illuminate\Contracts\Queue\ShouldBeUniqueUntilProcessing; class UpdateSearchIndex implements ShouldQueue, ShouldBeUniqueUntilProcessing{ // ...}
唯一任務鎖定
在幕後,當發送 ShouldBeUnique
任務時,Laravel 會嘗試使用 uniqueId
鍵獲取一個 鎖定。如果未獲取鎖定,則不會發送該任務。當任務完成處理或所有重試嘗試失敗時,此鎖定會被釋放。預設情況下,Laravel 將使用預設的快取驅動程式來獲取此鎖定。但是,如果您希望使用另一個驅動程式來獲取鎖定,您可以定義一個返回應使用的快取驅動程式的 uniqueVia
方法。
use Illuminate\Contracts\Cache\Repository;use Illuminate\Support\Facades\Cache; class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique{ ... /** * Get the cache driver for the unique job lock. */ public function uniqueVia(): Repository { return Cache::driver('redis'); }}
如果您只需要限制任務的並行處理,請改用 WithoutOverlapping
任務中介軟體。
加密任務
Laravel 允許您通過 加密 來確保任務數據的隱私性和完整性。首先,只需將 ShouldBeEncrypted
介面添加到任務類別中。一旦將此介面添加到類別中,Laravel 會在將任務推送到佇列之前自動加密您的任務。
<?php use Illuminate\Contracts\Queue\ShouldBeEncrypted;use Illuminate\Contracts\Queue\ShouldQueue; class UpdateSearchIndex implements ShouldQueue, ShouldBeEncrypted{ // ...}
任務中介層
任務中介軟體允許您在佇列任務的執行周圍包裝自定義邏輯,從而減少任務本身的樣板程式碼。例如,考慮以下 handle
方法,它利用 Laravel 的 Redis 速率限制功能,允許每五秒鐘只處理一個任務。
use Illuminate\Support\Facades\Redis; /** * Execute the job. */public function handle(): void{ Redis::throttle('key')->block(0)->allow(1)->every(5)->then(function () { info('Lock obtained...'); // Handle job... }, function () { // Could not obtain lock... return $this->release(5); });}
雖然此程式碼有效,但由於 handle
方法被 Redis 速率限制邏輯所雜亂,因此其實現變得混亂。此外,對於我們想要限制速率的任何其他任務,都必須重複此速率限制邏輯。
我們可以定義一個處理速率限制的任務中介軟體,而不是在 handle 方法中進行速率限制。Laravel 沒有任務中介軟體的預設位置,因此您可以將任務中介軟體放置在應用程式的任何位置。在本範例中,我們將把中介軟體放置在 app/Jobs/Middleware
目錄中。
<?php namespace App\Jobs\Middleware; use Closure;use Illuminate\Support\Facades\Redis; class RateLimited{ /** * Process the queued job. * * @param \Closure(object): void $next */ public function handle(object $job, Closure $next): void { Redis::throttle('key') ->block(0)->allow(1)->every(5) ->then(function () use ($job, $next) { // Lock obtained... $next($job); }, function () use ($job) { // Could not obtain lock... $job->release(5); }); }}
如您所見,與 路由中介軟體 一樣,任務中介軟體接收正在處理的任務以及應調用以繼續處理任務的回呼函數。
建立任務中介軟體後,可以通過從任務的 middleware
方法返回它們將它們附加到任務。此方法不存在於由 make:job
Artisan 命令建立的任務中,因此您需要手動將其添加到您的任務類別中。
use App\Jobs\Middleware\RateLimited; /** * Get the middleware the job should pass through. * * @return array<int, object> */public function middleware(): array{ return [new RateLimited];}
任務中介軟體也可以分配給可佇列的事件偵聽器、可郵寄的電子郵件和通知。
速率限制
儘管我們剛剛示範了如何編寫您自己的速率限制任務中介軟體,但 Laravel 實際上包含一個您可以利用來限制任務速率的速率限制中介軟體。與 路由速率限制器 一樣,任務速率限制器是使用 RateLimiter
facade 的 for
方法定義的。
例如,您可能希望允許使用者每小時備份一次數據,而對高級客戶不施加此限制。為了實現這一點,您可以在 AppServiceProvider
的 boot
方法中定義一個 RateLimiter
。
use Illuminate\Cache\RateLimiting\Limit;use Illuminate\Support\Facades\RateLimiter; /** * Bootstrap any application services. */public function boot(): void{ RateLimiter::for('backups', function (object $job) { return $job->user->vipCustomer() ? Limit::none() : Limit::perHour(1)->by($job->user->id); });}
在上面的範例中,我們定義了一個每小時的速率限制;但是,您可以使用 perMinute
方法輕鬆定義基於分鐘的速率限制。此外,您可以將任何您想要的值傳遞給速率限制的 by
方法;但是,此值最常用於按客戶劃分速率限制。
return Limit::perMinute(50)->by($job->user->id);
定義速率限制後,您可以使用 Illuminate\Queue\Middleware\RateLimited
中介軟體將速率限制器附加到您的任務。每次任務超過速率限制時,此中介軟體會根據速率限制持續時間將任務釋放回佇列,並給予適當的延遲。
use Illuminate\Queue\Middleware\RateLimited; /** * Get the middleware the job should pass through. * * @return array<int, object> */public function middleware(): array{ return [new RateLimited('backups')];}
將速率限制的任務釋放回佇列仍會增加任務的 attempts
總數。您可能希望相應地調整任務類別上的 tries
和 maxExceptions
屬性。或者,您可能希望使用 retryUntil
方法 來定義任務不應再嘗試的時間量。
如果您不希望在任務受到速率限制時重試該任務,可以使用 dontRelease
方法。
/** * Get the middleware the job should pass through. * * @return array<int, object> */public function middleware(): array{ return [(new RateLimited('backups'))->dontRelease()];}
如果您使用 Redis,您可以使用 Illuminate\Queue\Middleware\RateLimitedWithRedis
中介軟體,該中介軟體是為 Redis 精心調整的,並且比基本的速率限制中介軟體更有效。
防止任務重疊
Laravel 包含一個 Illuminate\Queue\Middleware\WithoutOverlapping
中介軟體,允許您根據任意鍵來防止任務重疊。當佇列任務正在修改一個應一次只能由一個任務修改的資源時,這可能很有用。
例如,假設您有一個佇列任務,用於更新使用者的信用評分,並且您想要防止同一使用者 ID 的信用評分更新任務重疊。為了實現這一點,您可以從任務的 middleware
方法返回 WithoutOverlapping
中介軟體。
use Illuminate\Queue\Middleware\WithoutOverlapping; /** * Get the middleware the job should pass through. * * @return array<int, object> */public function middleware(): array{ return [new WithoutOverlapping($this->user->id)];}
任何相同類型的重疊任務都將被釋放回佇列。您還可以指定釋放的任務必須經過多少秒才會再次嘗試。
/** * Get the middleware the job should pass through. * * @return array<int, object> */public function middleware(): array{ return [(new WithoutOverlapping($this->order->id))->releaseAfter(60)];}
如果您希望立即刪除任何重疊的任務,以便它們不會重試,可以使用 dontRelease
方法。
/** * Get the middleware the job should pass through. * * @return array<int, object> */public function middleware(): array{ return [(new WithoutOverlapping($this->order->id))->dontRelease()];}
WithoutOverlapping
中介層由 Laravel 的原子鎖定功能提供支援。有時,您的任務可能會意外失敗或逾時,導致鎖定無法釋放。因此,您可以使用 expireAfter
方法明確定義鎖定的到期時間。例如,以下範例會指示 Laravel 在任務開始處理三分鐘後釋放 WithoutOverlapping
鎖定。
/** * Get the middleware the job should pass through. * * @return array<int, object> */public function middleware(): array{ return [(new WithoutOverlapping($this->order->id))->expireAfter(180)];}
WithoutOverlapping
中介層需要支援鎖定的快取驅動程式。目前,memcached
、redis
、dynamodb
、database
、file
和 array
快取驅動程式都支援原子鎖定。
在任務類別之間共用鎖定鍵
預設情況下,WithoutOverlapping
中介層只會防止相同類別的任務重疊。因此,即使兩個不同的任務類別使用相同的鎖定鍵,它們也不會被防止重疊。但是,您可以使用 shared
方法指示 Laravel 將該鍵應用於所有任務類別。
use Illuminate\Queue\Middleware\WithoutOverlapping; class ProviderIsDown{ // ... public function middleware(): array { return [ (new WithoutOverlapping("status:{$this->provider}"))->shared(), ]; }} class ProviderIsUp{ // ... public function middleware(): array { return [ (new WithoutOverlapping("status:{$this->provider}"))->shared(), ]; }}
節流異常
Laravel 包含一個 Illuminate\Queue\Middleware\ThrottlesExceptions
中介層,可讓您限制異常的發生次數。一旦任務拋出指定次數的異常,所有進一步嘗試執行該任務的行為都會延遲到指定的間隔時間過後。此中介層對於與不穩定的第三方服務互動的任務特別有用。
例如,假設有一個佇列任務與開始拋出異常的第三方 API 互動。為了限制異常,您可以從任務的 middleware
方法傳回 ThrottlesExceptions
中介層。通常,此中介層應與實作基於時間的嘗試的任務配對使用。
use DateTime;use Illuminate\Queue\Middleware\ThrottlesExceptions; /** * Get the middleware the job should pass through. * * @return array<int, object> */public function middleware(): array{ return [new ThrottlesExceptions(10, 5 * 60)];} /** * Determine the time at which the job should timeout. */public function retryUntil(): DateTime{ return now()->addMinutes(30);}
中介層接受的第一個建構子參數是任務在被限制之前可以拋出的異常次數,而第二個建構子參數則是任務被限制後應經過的秒數,然後再嘗試執行任務。在上面的程式碼範例中,如果任務連續拋出 10 個異常,我們將等待 5 分鐘,然後再嘗試執行該任務,並受限於 30 分鐘的時間限制。
當任務拋出異常但尚未達到異常閾值時,任務通常會立即重試。但是,您可以在將中介層附加到任務時呼叫 backoff
方法,以指定此類任務應延遲的分鐘數。
use Illuminate\Queue\Middleware\ThrottlesExceptions; /** * Get the middleware the job should pass through. * * @return array<int, object> */public function middleware(): array{ return [(new ThrottlesExceptions(10, 5 * 60))->backoff(5)];}
在內部,此中介層使用 Laravel 的快取系統來實作速率限制,並使用任務的類別名稱作為快取「鍵」。您可以在將中介層附加到您的任務時,呼叫 by
方法來覆寫此鍵。如果您有多個任務與相同的第三方服務互動,並且您希望它們共用一個通用的限制「桶」,這可能會很有用。
use Illuminate\Queue\Middleware\ThrottlesExceptions; /** * Get the middleware the job should pass through. * * @return array<int, object> */public function middleware(): array{ return [(new ThrottlesExceptions(10, 10 * 60))->by('key')];}
預設情況下,此中介層將限制每個異常。您可以透過在將中介層附加到您的任務時呼叫 when
方法來修改此行為。只有在提供給 when
方法的閉包傳回 true
時,才會限制異常。
use Illuminate\Http\Client\HttpClientException;use Illuminate\Queue\Middleware\ThrottlesExceptions; /** * Get the middleware the job should pass through. * * @return array<int, object> */public function middleware(): array{ return [(new ThrottlesExceptions(10, 10 * 60))->when( fn (Throwable $throwable) => $throwable instanceof HttpClientException )];}
如果您希望將受限制的異常報告給您應用程式的異常處理程序,您可以在將中介層附加到您的任務時呼叫 report
方法來執行此操作。您可以選擇性地為 report
方法提供閉包,並且只有在給定的閉包傳回 true
時才會報告異常。
use Illuminate\Http\Client\HttpClientException;use Illuminate\Queue\Middleware\ThrottlesExceptions; /** * Get the middleware the job should pass through. * * @return array<int, object> */public function middleware(): array{ return [(new ThrottlesExceptions(10, 10 * 60))->report( fn (Throwable $throwable) => $throwable instanceof HttpClientException )];}
如果您使用 Redis,可以使用 Illuminate\Queue\Middleware\ThrottlesExceptionsWithRedis
中介層,該中介層針對 Redis 進行了微調,並且比基本異常限制中介層更有效率。
跳過任務
Skip
中介層可讓您指定應略過/刪除任務,而無需修改任務的邏輯。如果給定條件評估為 true
,則 Skip::when
方法會刪除任務;如果條件評估為 false
,則 Skip::unless
方法會刪除任務。
use Illuminate\Queue\Middleware\Skip; /*** Get the middleware the job should pass through.*/public function middleware(): array{ return [ Skip::when($someCondition), ];}
您也可以將 Closure
傳遞給 when
和 unless
方法,以進行更複雜的條件評估。
use Illuminate\Queue\Middleware\Skip; /*** Get the middleware the job should pass through.*/public function middleware(): array{ return [ Skip::when(function (): bool { return $this->shouldSkip(); }), ];}
派遣任務
一旦您編寫了任務類別,您可以使用任務本身的 dispatch
方法來分派它。傳遞給 dispatch
方法的引數將會提供給任務的建構子。
<?php namespace App\Http\Controllers; use App\Http\Controllers\Controller;use App\Jobs\ProcessPodcast;use App\Models\Podcast;use Illuminate\Http\RedirectResponse;use Illuminate\Http\Request; class PodcastController extends Controller{ /** * Store a new podcast. */ public function store(Request $request): RedirectResponse { $podcast = Podcast::create(/* ... */); // ... ProcessPodcast::dispatch($podcast); return redirect('/podcasts'); }}
如果您想要有條件地分派任務,可以使用 dispatchIf
和 dispatchUnless
方法。
ProcessPodcast::dispatchIf($accountActive, $podcast); ProcessPodcast::dispatchUnless($accountSuspended, $podcast);
在新的 Laravel 應用程式中,sync
驅動程式是預設的佇列驅動程式。此驅動程式在目前請求的前景中同步執行任務,這在本地開發期間通常很方便。如果您想實際開始將任務放入佇列以進行背景處理,可以在您應用程式的 config/queue.php
組態檔中指定不同的佇列驅動程式。
延遲派遣
如果您想指定某個任務不應立即提供給佇列工作者處理,您可以在分派任務時使用 delay
方法。例如,讓我們指定任務在分派後 10 分鐘內不可用於處理。
<?php namespace App\Http\Controllers; use App\Http\Controllers\Controller;use App\Jobs\ProcessPodcast;use App\Models\Podcast;use Illuminate\Http\RedirectResponse;use Illuminate\Http\Request; class PodcastController extends Controller{ /** * Store a new podcast. */ public function store(Request $request): RedirectResponse { $podcast = Podcast::create(/* ... */); // ... ProcessPodcast::dispatch($podcast) ->delay(now()->addMinutes(10)); return redirect('/podcasts'); }}
在某些情況下,任務可能設定了預設的延遲時間。如果您需要略過此延遲並立即分派任務進行處理,可以使用 withoutDelay
方法。
ProcessPodcast::dispatch($podcast)->withoutDelay();
Amazon SQS 佇列服務的最大延遲時間為 15 分鐘。
在回應傳送到瀏覽器後分派
或者,如果您的網頁伺服器使用 FastCGI,dispatchAfterResponse
方法會延遲分派任務,直到 HTTP 回應傳送到使用者的瀏覽器之後。即使佇列任務仍在執行,這仍會讓使用者開始使用應用程式。這通常只應適用於大約需要一秒鐘的任務,例如傳送電子郵件。由於它們是在目前的 HTTP 請求中處理的,因此以這種方式分派的任務不需要執行佇列工作者才能處理它們。
use App\Jobs\SendNotification; SendNotification::dispatchAfterResponse();
您也可以 dispatch
閉包,並將 afterResponse
方法鏈結到 dispatch
輔助函式,以便在 HTTP 回應傳送到瀏覽器後執行閉包。
use App\Mail\WelcomeMessage;use Illuminate\Support\Facades\Mail; dispatch(function () {})->afterResponse();
同步派遣
如果您想立即 (同步) 分派任務,可以使用 dispatchSync
方法。當使用此方法時,任務不會放入佇列,而會在目前的處理程序中立即執行。
<?php namespace App\Http\Controllers; use App\Http\Controllers\Controller;use App\Jobs\ProcessPodcast;use App\Models\Podcast;use Illuminate\Http\RedirectResponse;use Illuminate\Http\Request; class PodcastController extends Controller{ /** * Store a new podcast. */ public function store(Request $request): RedirectResponse { $podcast = Podcast::create(/* ... */); // Create podcast... ProcessPodcast::dispatchSync($podcast); return redirect('/podcasts'); }}
任務與資料庫交易
雖然在資料庫交易中分派任務是完全沒問題的,但您應特別注意確保您的任務實際上能夠成功執行。當在交易中分派任務時,任務有可能在父交易提交之前由工作者處理。當發生這種情況時,您在資料庫交易期間對模型或資料庫記錄所做的任何更新可能尚未反映在資料庫中。此外,在交易中建立的任何模型或資料庫記錄可能不存在於資料庫中。
值得慶幸的是,Laravel 提供了幾種解決此問題的方法。首先,您可以在佇列連線的組態陣列中設定 after_commit
連線選項。
'redis' => [ 'driver' => 'redis', // ... 'after_commit' => true,],
當 after_commit
選項為 true
時,您可以在資料庫交易中分派任務;但是,Laravel 會等待直到開啟的父資料庫交易提交後,才會實際分派任務。當然,如果目前沒有開啟的資料庫交易,任務將會立即分派。
如果交易因交易期間發生的異常而回滾,則在該交易期間分派的任務將會被捨棄。
將 after_commit
組態選項設定為 true
也會導致任何放入佇列的事件偵聽器、可郵件、通知和廣播事件在所有開啟的資料庫交易提交後才分派。
指定內嵌提交分派行為
如果您未將 after_commit
佇列連線組態選項設定為 true
,您仍然可以指示在所有開啟的資料庫交易提交後才應分派特定的任務。若要完成此操作,您可以將 afterCommit
方法鏈結到您的分派操作。
use App\Jobs\ProcessPodcast; ProcessPodcast::dispatch($podcast)->afterCommit();
同樣地,如果 after_commit
組態選項設定為 true
,您可以指示應立即分派特定任務,而無需等待任何開啟的資料庫交易提交。
ProcessPodcast::dispatch($podcast)->beforeCommit();
任務鏈
任務鏈結可讓您指定一個佇列任務清單,這些任務應在主要任務成功執行後按順序執行。如果序列中的一個任務失敗,則其餘任務將不會執行。若要執行佇列任務鏈結,您可以使用 Bus
Facade 提供的 chain
方法。Laravel 的命令匯流排是一個較低層級的元件,佇列任務分派是建立在其之上的。
use App\Jobs\OptimizePodcast;use App\Jobs\ProcessPodcast;use App\Jobs\ReleasePodcast;use Illuminate\Support\Facades\Bus; Bus::chain([ new ProcessPodcast, new OptimizePodcast, new ReleasePodcast,])->dispatch();
除了鏈結任務類別實例之外,您也可以鏈結閉包。
Bus::chain([ new ProcessPodcast, new OptimizePodcast, function () { Podcast::update(/* ... */); },])->dispatch();
在任務中使用 $this->delete()
方法刪除任務不會阻止鏈結任務被處理。只有當鏈中的任務失敗時,鏈才會停止執行。
鏈結連線和佇列
如果您想指定鏈結任務應使用的連線和佇列,可以使用 onConnection
和 onQueue
方法。這些方法指定應使用的佇列連線和佇列名稱,除非佇列任務明確指派了不同的連線/佇列。
Bus::chain([ new ProcessPodcast, new OptimizePodcast, new ReleasePodcast,])->onConnection('redis')->onQueue('podcasts')->dispatch();
將任務新增至鏈結
有時,您可能需要從該鏈中的另一個任務中,將任務前置或附加到現有的任務鏈結。您可以使用 prependToChain
和 appendToChain
方法來完成此操作。
/** * Execute the job. */public function handle(): void{ // ... // Prepend to the current chain, run job immediately after current job... $this->prependToChain(new TranscribePodcast); // Append to the current chain, run job at end of chain... $this->appendToChain(new TranscribePodcast);}
鏈結失敗
在鏈結任務時,您可以使用 catch
方法來指定如果鏈中的任務失敗時應叫用的閉包。給定的回呼將接收導致任務失敗的 Throwable
實例。
use Illuminate\Support\Facades\Bus;use Throwable; Bus::chain([ new ProcessPodcast, new OptimizePodcast, new ReleasePodcast,])->catch(function (Throwable $e) { // A job within the chain has failed...})->dispatch();
由於鏈結回呼是由 Laravel 佇列序列化並稍後執行,因此您不應在鏈結回呼中使用 $this
變數。
自訂佇列和連線
分派到特定佇列
透過將任務推送到不同的佇列,您可以「分類」您的佇列任務,甚至可以優先處理您指派給各種佇列的工作者數量。請記住,這不會將任務推送到您的佇列組態檔定義的不同佇列「連線」,而只會推送到單一連線內的特定佇列。若要指定佇列,請在分派任務時使用 onQueue
方法。
<?php namespace App\Http\Controllers; use App\Http\Controllers\Controller;use App\Jobs\ProcessPodcast;use App\Models\Podcast;use Illuminate\Http\RedirectResponse;use Illuminate\Http\Request; class PodcastController extends Controller{ /** * Store a new podcast. */ public function store(Request $request): RedirectResponse { $podcast = Podcast::create(/* ... */); // Create podcast... ProcessPodcast::dispatch($podcast)->onQueue('processing'); return redirect('/podcasts'); }}
或者,您也可以在任務的建構子中呼叫 onQueue
方法來指定任務的佇列。
<?php namespace App\Jobs; use Illuminate\Contracts\Queue\ShouldQueue; use Illuminate\Foundation\Queue\Queueable; class ProcessPodcast implements ShouldQueue{ use Queueable; /** * Create a new job instance. */ public function __construct() { $this->onQueue('processing'); }}
分派到特定連線
如果您的應用程式與多個佇列連線互動,可以使用 onConnection
方法指定將任務推送到的連線。
<?php namespace App\Http\Controllers; use App\Http\Controllers\Controller;use App\Jobs\ProcessPodcast;use App\Models\Podcast;use Illuminate\Http\RedirectResponse;use Illuminate\Http\Request; class PodcastController extends Controller{ /** * Store a new podcast. */ public function store(Request $request): RedirectResponse { $podcast = Podcast::create(/* ... */); // Create podcast... ProcessPodcast::dispatch($podcast)->onConnection('sqs'); return redirect('/podcasts'); }}
您可以使用鏈式調用的方式,將 onConnection
和 onQueue
方法串接起來,以指定任務的連線和佇列。
ProcessPodcast::dispatch($podcast) ->onConnection('sqs') ->onQueue('processing');
或者,您也可以在任務的建構子中呼叫 onConnection
方法來指定任務的連線。
<?php namespace App\Jobs; use Illuminate\Contracts\Queue\ShouldQueue; use Illuminate\Foundation\Queue\Queueable; class ProcessPodcast implements ShouldQueue{ use Queueable; /** * Create a new job instance. */ public function __construct() { $this->onConnection('sqs'); }}
指定最大任務嘗試次數 / 超時值
最大嘗試次數
如果您的某個佇列任務遇到錯誤,您可能不希望它無限次重試。因此,Laravel 提供了多種方式來指定任務可以嘗試的次數或時間長度。
一種指定任務最大嘗試次數的方法是透過 Artisan 命令列的 --tries
參數。這將適用於 worker 處理的所有任務,除非正在處理的任務本身指定了它允許嘗試的次數。
php artisan queue:work --tries=3
如果任務超過其最大嘗試次數,它將被視為「失敗」的任務。有關處理失敗任務的更多資訊,請參閱失敗任務的相關文件。如果向 queue:work
命令提供 --tries=0
,則任務將無限次重試。
您可以更細緻地控制任務的最大嘗試次數,方法是在任務類別本身定義。如果在任務上指定了最大嘗試次數,它將優先於命令列提供的 --tries
值。
<?php namespace App\Jobs; class ProcessPodcast implements ShouldQueue{ /** * The number of times the job may be attempted. * * @var int */ public $tries = 5;}
如果您需要動態控制特定任務的最大嘗試次數,您可以在任務上定義一個 tries
方法。
/** * Determine number of times the job may be attempted. */public function tries(): int{ return 5;}
基於時間的嘗試
除了定義任務在失敗之前可以嘗試的次數之外,您還可以定義任務應該不再嘗試的時間。這允許任務在給定的時間範圍內嘗試任意次數。若要定義任務不再嘗試的時間,請在您的任務類別中新增一個 retryUntil
方法。此方法應傳回 DateTime
實例。
use DateTime; /** * Determine the time at which the job should timeout. */public function retryUntil(): DateTime{ return now()->addMinutes(10);}
您也可以在您的佇列事件監聽器上定義 tries
屬性或 retryUntil
方法。
最大例外次數
有時您可能希望指定一個任務可以嘗試多次,但如果重試是由給定數量的未處理例外觸發的(而不是直接由 release
方法釋放),則應失敗。要實現此目的,您可以在任務類別上定義一個 maxExceptions
屬性。
<?php namespace App\Jobs; use Illuminate\Support\Facades\Redis; class ProcessPodcast implements ShouldQueue{ /** * The number of times the job may be attempted. * * @var int */ public $tries = 25; /** * The maximum number of unhandled exceptions to allow before failing. * * @var int */ public $maxExceptions = 3; /** * Execute the job. */ public function handle(): void { Redis::throttle('key')->allow(10)->every(60)->then(function () { // Lock obtained, process the podcast... }, function () { // Unable to obtain lock... return $this->release(10); }); }}
在此範例中,如果應用程式無法取得 Redis 鎖定,則任務將釋放十秒鐘,並將繼續重試最多 25 次。但是,如果任務拋出三個未處理的例外,則任務將失敗。
逾時
通常,您大概知道您預期佇列任務需要多長時間。因此,Laravel 允許您指定「逾時」值。預設情況下,逾時值為 60 秒。如果任務處理時間超過逾時值指定的秒數,則處理該任務的 worker 將會因錯誤而退出。通常,worker 將會由您伺服器上設定的程序管理器自動重新啟動。
可以使用 Artisan 命令列的 --timeout
參數來指定任務可以執行的最大秒數。
php artisan queue:work --timeout=30
如果任務因持續逾時而超過其最大嘗試次數,則將被標記為失敗。
您也可以在任務類別本身定義任務允許執行的最大秒數。如果在任務上指定了逾時時間,它將優先於命令列上指定的任何逾時時間。
<?php namespace App\Jobs; class ProcessPodcast implements ShouldQueue{ /** * The number of seconds the job can run before timing out. * * @var int */ public $timeout = 120;}
有時,IO 阻塞程序(例如 sockets 或外發的 HTTP 連線)可能不會遵守您指定的逾時時間。因此,當使用這些功能時,您也應該始終嘗試使用它們的 API 來指定逾時時間。例如,當使用 Guzzle 時,您應該始終指定連線和請求的逾時值。
必須安裝 pcntl
PHP 擴展才能指定任務逾時。此外,任務的「逾時」值應始終小於其「重試後」值。否則,任務可能會在實際完成執行或逾時之前被重新嘗試。
逾時時失敗
如果您想表示在逾時時應將任務標記為失敗,您可以在任務類別上定義 $failOnTimeout
屬性。
/** * Indicate if the job should be marked as failed on timeout. * * @var bool */public $failOnTimeout = true;
錯誤處理
如果在處理任務時拋出例外,則該任務將自動釋放回佇列,以便再次嘗試。該任務將繼續被釋放,直到它已嘗試應用程式允許的最大次數。最大嘗試次數由 queue:work
Artisan 命令上使用的 --tries
參數定義。或者,最大嘗試次數也可以在任務類別本身定義。有關執行佇列 worker 的更多資訊,可以在下方找到。
手動釋放任務
有時您可能希望手動將任務釋放回佇列,以便稍後再次嘗試。您可以使用呼叫 release
方法來完成此操作。
/** * Execute the job. */public function handle(): void{ // ... $this->release();}
預設情況下,release
方法會將任務釋放回佇列以立即處理。但是,您可以指示佇列在經過給定的秒數之前不讓任務可供處理,方法是將整數或日期實例傳遞給 release
方法。
$this->release(10); $this->release(now()->addSeconds(10));
手動使任務失敗
有時您可能需要手動將任務標記為「失敗」。為此,您可以呼叫 fail
方法。
/** * Execute the job. */public function handle(): void{ // ... $this->fail();}
如果您想因為捕獲到的例外而將任務標記為失敗,您可以將例外傳遞給 fail
方法。或者,為了方便起見,您可以傳遞一個字串錯誤訊息,它將為您轉換為例外。
$this->fail($exception); $this->fail('Something went wrong.');
有關失敗任務的更多資訊,請查看有關處理任務失敗的文件。
任務批次
Laravel 的任務批次處理功能允許您輕鬆執行一批任務,然後在該批任務完成執行時執行某些操作。在開始之前,您應該建立資料庫遷移,以建立一個表,其中將包含有關您的任務批次的元數據資訊,例如它們的完成百分比。可以使用 make:queue-batches-table
Artisan 命令來產生此遷移。
php artisan make:queue-batches-table php artisan migrate
定義可批次處理的任務
若要定義可批次處理的任務,您應該像平常一樣建立一個可佇列的任務;但是,您應該將 Illuminate\Bus\Batchable
trait 新增到任務類別。此 trait 提供了對 batch
方法的存取權,該方法可用於檢索任務正在其中執行的目前批次。
<?php namespace App\Jobs; use Illuminate\Bus\Batchable;use Illuminate\Contracts\Queue\ShouldQueue;use Illuminate\Foundation\Queue\Queueable; class ImportCsv implements ShouldQueue{ use Batchable, Queueable; /** * Execute the job. */ public function handle(): void { if ($this->batch()->cancelled()) { // Determine if the batch has been cancelled... return; } // Import a portion of the CSV file... }}
派遣批次
若要派送一批任務,您應該使用 Bus
facade 的 batch
方法。當然,批次處理在與完成回呼結合使用時最有用。因此,您可以使用 then
、catch
和 finally
方法來定義批次的完成回呼。這些回呼中的每一個在被呼叫時都會收到一個 Illuminate\Bus\Batch
實例。在此範例中,我們假設我們正在佇列一批任務,每個任務都會處理 CSV 檔案中給定數量的列。
use App\Jobs\ImportCsv;use Illuminate\Bus\Batch;use Illuminate\Support\Facades\Bus;use Throwable; $batch = Bus::batch([ new ImportCsv(1, 100), new ImportCsv(101, 200), new ImportCsv(201, 300), new ImportCsv(301, 400), new ImportCsv(401, 500),])->before(function (Batch $batch) { // The batch has been created but no jobs have been added...})->progress(function (Batch $batch) { // A single job has completed successfully...})->then(function (Batch $batch) { // All jobs completed successfully...})->catch(function (Batch $batch, Throwable $e) { // First batch job failure detected...})->finally(function (Batch $batch) { // The batch has finished executing...})->dispatch(); return $batch->id;
批次的 ID,可以透過 $batch->id
屬性存取,可用於查詢 Laravel 命令匯流排以取得有關已派送批次的資訊。
由於批次回呼會被序列化並稍後由 Laravel 佇列執行,因此您不應在回呼中使用 $this
變數。此外,由於批次任務會包裝在資料庫交易中,因此不應在任務中執行觸發隱式提交的資料庫語句。
命名批次
如果批次被命名,某些工具(例如 Laravel Horizon 和 Laravel Telescope)可能會為批次提供更友善的使用者除錯資訊。若要為批次指定任意名稱,您可以在定義批次時呼叫 name
方法。
$batch = Bus::batch([ // ...])->then(function (Batch $batch) { // All jobs completed successfully...})->name('Import CSV')->dispatch();
批次連線和佇列
如果您想指定批次任務應使用的連線和佇列,您可以使用 onConnection
和 onQueue
方法。所有批次任務都必須在相同的連線和佇列中執行。
$batch = Bus::batch([ // ...])->then(function (Batch $batch) { // All jobs completed successfully...})->onConnection('redis')->onQueue('imports')->dispatch();
鏈和批次
您可以透過將鏈式任務放置在陣列中,來定義批次中的一組鏈式任務。例如,我們可以平行執行兩個任務鏈,並在兩個任務鏈都完成處理後執行回呼。
use App\Jobs\ReleasePodcast;use App\Jobs\SendPodcastReleaseNotification;use Illuminate\Bus\Batch;use Illuminate\Support\Facades\Bus; Bus::batch([ [ new ReleasePodcast(1), new SendPodcastReleaseNotification(1), ], [ new ReleasePodcast(2), new SendPodcastReleaseNotification(2), ],])->then(function (Batch $batch) { // ...})->dispatch();
反之,您可以透過在鏈中定義批次,在鏈中執行批次任務。例如,您可以先執行一批任務來發佈多個 Podcast,然後執行一批任務來傳送發佈通知。
use App\Jobs\FlushPodcastCache;use App\Jobs\ReleasePodcast;use App\Jobs\SendPodcastReleaseNotification;use Illuminate\Support\Facades\Bus; Bus::chain([ new FlushPodcastCache, Bus::batch([ new ReleasePodcast(1), new ReleasePodcast(2), ]), Bus::batch([ new SendPodcastReleaseNotification(1), new SendPodcastReleaseNotification(2), ]),])->dispatch();
將任務新增至批次
有時,從批次任務中將其他任務新增到批次中可能會很有用。當您需要批次處理數千個任務時,此模式會很有用,因為在 Web 請求期間派送這些任務可能需要太長時間。因此,您可能希望派送初始批次的「載入器」任務,以使用更多任務來補充批次。
$batch = Bus::batch([ new LoadImportBatch, new LoadImportBatch, new LoadImportBatch,])->then(function (Batch $batch) { // All jobs completed successfully...})->name('Import Contacts')->dispatch();
在此範例中,我們將使用 LoadImportBatch
任務來使用其他任務補充批次。若要完成此操作,我們可以使用透過任務的 batch
方法存取的批次實例上的 add
方法。
use App\Jobs\ImportContacts;use Illuminate\Support\Collection; /** * Execute the job. */public function handle(): void{ if ($this->batch()->cancelled()) { return; } $this->batch()->add(Collection::times(1000, function () { return new ImportContacts; }));}
您只能從屬於同一批次的任務中向批次新增任務。
檢查批次
提供給批次完成回呼的 Illuminate\Bus\Batch
實例具有多種屬性和方法,可協助您與給定批次任務互動並檢查。
// The UUID of the batch...$batch->id; // The name of the batch (if applicable)...$batch->name; // The number of jobs assigned to the batch...$batch->totalJobs; // The number of jobs that have not been processed by the queue...$batch->pendingJobs; // The number of jobs that have failed...$batch->failedJobs; // The number of jobs that have been processed thus far...$batch->processedJobs(); // The completion percentage of the batch (0-100)...$batch->progress(); // Indicates if the batch has finished executing...$batch->finished(); // Cancel the execution of the batch...$batch->cancel(); // Indicates if the batch has been cancelled...$batch->cancelled();
從路由返回批次
所有 Illuminate\Bus\Batch
實例都是 JSON 可序列化的,這表示您可以直接從您的應用程式路由之一返回它們,以檢索包含有關批次的資訊的 JSON 負載,包括其完成進度。這使得在您應用程式的 UI 中顯示有關批次完成進度的資訊變得方便。
若要依 ID 檢索批次,您可以使用 Bus
facade 的 findBatch
方法。
use Illuminate\Support\Facades\Bus;use Illuminate\Support\Facades\Route; Route::get('/batch/{batchId}', function (string $batchId) { return Bus::findBatch($batchId);});
取消批次
有時,您可能需要取消給定批次的執行。這可以透過呼叫 Illuminate\Bus\Batch
實例上的 cancel
方法來完成。
/** * Execute the job. */public function handle(): void{ if ($this->user->exceedsImportLimit()) { return $this->batch()->cancel(); } if ($this->batch()->cancelled()) { return; }}
如您在先前的範例中可能已經注意到,批次任務通常應在繼續執行前判斷其對應的批次是否已取消。然而,為了方便起見,您可以將 SkipIfBatchCancelled
中介層 指派給該任務。顧名思義,此中介層將指示 Laravel 在對應的批次已取消時,不要處理該任務。
use Illuminate\Queue\Middleware\SkipIfBatchCancelled; /** * Get the middleware the job should pass through. */public function middleware(): array{ return [new SkipIfBatchCancelled];}
批次失敗
當批次任務失敗時,會調用 catch
回呼(如果已指派)。此回呼僅針對批次中第一個失敗的任務調用。
允許失敗
當批次中的任務失敗時,Laravel 會自動將該批次標記為「已取消」。如果您希望,您可以停用此行為,讓任務失敗不會自動將批次標記為已取消。這可以透過在發送批次時呼叫 allowFailures
方法來完成。
$batch = Bus::batch([ // ...])->then(function (Batch $batch) { // All jobs completed successfully...})->allowFailures()->dispatch();
重試失敗的批次任務
為了方便起見,Laravel 提供了一個 queue:retry-batch
Artisan 命令,讓您可以輕鬆地重試給定批次的所有失敗任務。queue:retry-batch
命令接受批次的 UUID,其失敗的任務應被重試。
php artisan queue:retry-batch 32dbc76c-4f82-4749-b610-a639fe0099b5
修剪批次
如果沒有修剪,job_batches
表格可能會非常快速地累積記錄。為了緩解這個問題,您應該排程每天執行 queue:prune-batches
Artisan 命令。
use Illuminate\Support\Facades\Schedule; Schedule::command('queue:prune-batches')->daily();
預設情況下,所有超過 24 小時的已完成批次都會被修剪。您可以在呼叫命令時使用 hours
選項來決定要保留批次資料的時間長度。例如,以下命令將刪除所有在 48 小時前完成的批次。
use Illuminate\Support\Facades\Schedule; Schedule::command('queue:prune-batches --hours=48')->daily();
有時候,您的 jobs_batches
表格可能會累積從未成功完成的批次記錄,例如任務失敗且該任務從未成功重試的批次。您可以使用 unfinished
選項指示 queue:prune-batches
命令修剪這些未完成的批次記錄。
use Illuminate\Support\Facades\Schedule; Schedule::command('queue:prune-batches --hours=48 --unfinished=72')->daily();
同樣地,您的 jobs_batches
表格也可能會累積已取消批次的批次記錄。您可以使用 cancelled
選項指示 queue:prune-batches
命令修剪這些已取消的批次記錄。
use Illuminate\Support\Facades\Schedule; Schedule::command('queue:prune-batches --hours=48 --cancelled=72')->daily();
在 DynamoDB 中儲存批次
Laravel 還支援將批次元資訊儲存在 DynamoDB 而不是關聯式資料庫中。但是,您需要手動建立一個 DynamoDB 表格來儲存所有批次記錄。
通常,此表格應命名為 job_batches
,但您應根據應用程式 queue
設定檔中 queue.batching.table
設定的值來命名該表格。
DynamoDB 批次表格設定
job_batches
表格應具有一個名為 application
的字串主分割索引鍵和一個名為 id
的字串主排序索引鍵。索引鍵的 application
部分將包含應用程式的名稱,此名稱由應用程式 app
設定檔中的 name
設定值定義。由於應用程式名稱是 DynamoDB 表格索引鍵的一部分,因此您可以使用同一個表格來儲存多個 Laravel 應用程式的任務批次。
此外,如果您想利用自動批次修剪,您可以為表格定義 ttl
屬性。
DynamoDB 設定
接下來,安裝 AWS SDK,以便您的 Laravel 應用程式可以與 Amazon DynamoDB 通訊。
composer require aws/aws-sdk-php
然後,將 queue.batching.driver
設定選項的值設定為 dynamodb
。此外,您應在 batching
設定陣列中定義 key
、secret
和 region
設定選項。這些選項將用於與 AWS 進行身份驗證。使用 dynamodb
驅動程式時,queue.batching.database
設定選項是不必要的。
'batching' => [ 'driver' => env('QUEUE_BATCHING_DRIVER', 'dynamodb'), 'key' => env('AWS_ACCESS_KEY_ID'), 'secret' => env('AWS_SECRET_ACCESS_KEY'), 'region' => env('AWS_DEFAULT_REGION', 'us-east-1'), 'table' => 'job_batches',],
在 DynamoDB 中修剪批次
當使用 DynamoDB 來儲存任務批次資訊時,用於修剪儲存在關聯式資料庫中批次的典型修剪命令將無法運作。相反地,您可以使用 DynamoDB 的原生 TTL 功能 來自動移除舊批次的記錄。
如果您使用 ttl
屬性定義了您的 DynamoDB 表格,您可以定義設定參數來指示 Laravel 如何修剪批次記錄。queue.batching.ttl_attribute
設定值定義了保存 TTL 的屬性名稱,而 queue.batching.ttl
設定值定義了在記錄上次更新後,批次記錄可以從 DynamoDB 表格中移除的秒數。
'batching' => [ 'driver' => env('QUEUE_FAILED_DRIVER', 'dynamodb'), 'key' => env('AWS_ACCESS_KEY_ID'), 'secret' => env('AWS_SECRET_ACCESS_KEY'), 'region' => env('AWS_DEFAULT_REGION', 'us-east-1'), 'table' => 'job_batches', 'ttl_attribute' => 'ttl', 'ttl' => 60 * 60 * 24 * 7, // 7 days...],
佇列閉包
您也可以發送一個閉包到佇列,而不是發送一個任務類別。這非常適合需要在目前請求週期之外執行的快速、簡單任務。當發送閉包到佇列時,閉包的程式碼內容會進行加密簽名,使其無法在傳輸過程中被修改。
$podcast = App\Podcast::find(1); dispatch(function () use ($podcast) { $podcast->publish();});
使用 catch
方法,您可以提供一個閉包,如果佇列中的閉包在耗盡您佇列的所有設定的重試次數後仍無法成功完成,則應執行此閉包。
use Throwable; dispatch(function () use ($podcast) { $podcast->publish();})->catch(function (Throwable $e) { // This job has failed...});
由於 catch
回呼會被序列化,並在稍後由 Laravel 佇列執行,因此您不應在 catch
回呼中使用 $this
變數。
執行佇列工作程式
queue:work
命令
Laravel 包含一個 Artisan 命令,它將啟動佇列工作程序並處理推送到佇列的新任務。您可以使用 queue:work
Artisan 命令來執行工作程序。請注意,一旦 queue:work
命令啟動,它將持續運行,直到手動停止或關閉終端機為止。
php artisan queue:work
為了讓 queue:work
程序在背景中永久運行,您應該使用進程監控器(例如 Supervisor)來確保佇列工作程序不會停止運行。
如果您希望處理過的任務 ID 包含在命令的輸出中,您可以在調用 queue:work
命令時包含 -v
標誌。
php artisan queue:work -v
請記住,佇列工作程序是長時間運行的程序,並將啟動的應用程式狀態儲存在記憶體中。因此,它們在啟動後不會注意到您的程式碼庫中的變更。因此,在您的部署過程中,請務必重新啟動您的佇列工作程序。此外,請記住,應用程式建立或修改的任何靜態狀態都不會在任務之間自動重置。
或者,您可以執行 queue:listen
命令。當使用 queue:listen
命令時,您不必在想要重新載入更新的程式碼或重置應用程式狀態時手動重新啟動工作程序;但是,此命令的效率遠低於 queue:work
命令。
php artisan queue:listen
執行多個佇列工作程序
若要將多個工作程序指派給一個佇列並同時處理任務,您只需啟動多個 queue:work
程序。這可以透過終端機中的多個標籤在本地完成,也可以使用您的進程管理器的設定在生產環境中完成。使用 Supervisor 時,您可以使用 numprocs
設定值。
指定連線和佇列
您也可以指定工作程序應使用的佇列連線。傳遞給 work
命令的連線名稱應對應於您的 config/queue.php
設定檔中定義的其中一個連線。
php artisan queue:work redis
預設情況下,queue:work
命令只會處理給定連線的預設佇列的任務。但是,您也可以透過只處理給定連線的特定佇列,進一步自訂您的佇列工作程序。例如,如果您所有的電子郵件都在 redis
佇列連線的 emails
佇列中處理,您可以發出以下命令來啟動一個只處理該佇列的工作程序。
php artisan queue:work redis --queue=emails
處理指定數量的任務
--once
選項可用於指示工作程序只處理佇列中的一個任務。
php artisan queue:work --once
--max-jobs
選項可用於指示工作程序處理給定數量的任務,然後退出。當與Supervisor結合使用時,此選項可能很有用,以便您的工作程序在處理給定數量的任務後自動重新啟動,釋放它們可能累積的任何記憶體。
php artisan queue:work --max-jobs=1000
處理所有佇列中的任務,然後退出
--stop-when-empty
選項可用於指示工作程序處理所有任務,然後正常退出。如果您希望在佇列為空後關閉容器,在 Docker 容器中處理 Laravel 佇列時,此選項會很有用。
php artisan queue:work --stop-when-empty
處理給定秒數的任務
--max-time
選項可用於指示工作程序在給定秒數內處理任務,然後退出。當與Supervisor結合使用時,此選項可能很有用,以便您的工作程序在處理給定時間量的任務後自動重新啟動,釋放它們可能累積的任何記憶體。
# Process jobs for one hour and then exit...php artisan queue:work --max-time=3600
工作程序休眠時間
當佇列中有可用任務時,工作程序將繼續處理任務,任務之間沒有延遲。但是,如果沒有可用的任務,sleep
選項會決定工作程序將「休眠」多少秒。當然,在休眠時,工作程序將不會處理任何新任務。
php artisan queue:work --sleep=3
維護模式和佇列
當您的應用程式處於維護模式時,不會處理任何佇列中的任務。一旦應用程式退出維護模式,將會像平常一樣繼續處理任務。
若要強制您的佇列工作程序即使在啟用維護模式的情況下也處理任務,您可以使用 --force
選項。
php artisan queue:work --force
資源考量
守護程序佇列工作程序不會在處理每個任務之前「重新啟動」框架。因此,您應該在每個任務完成後釋放任何繁重的資源。例如,如果您正在使用 GD 函式庫進行影像處理,您應該在完成影像處理後使用 imagedestroy
釋放記憶體。
佇列優先順序
有時候您可能希望優先處理您的佇列。例如,在您的 config/queue.php
設定檔中,您可以將 redis
連線的預設 queue
設定為 low
。但是,有時您可能希望將任務推送到 high
優先級佇列,如下所示
dispatch((new Job)->onQueue('high'));
若要啟動一個工作程序,以驗證 high
佇列中的所有任務都已處理完畢,然後再繼續處理 low
佇列中的任何任務,請將以逗號分隔的佇列名稱列表傳遞給 work
命令。
php artisan queue:work --queue=high,low
佇列工作程式和部署
由於佇列工作程序是長時間運行的程序,因此它們在不重新啟動的情況下不會注意到您的程式碼變更。因此,使用佇列工作程序部署應用程式最簡單的方法是在部署過程中重新啟動工作程序。您可以使用 queue:restart
命令正常重新啟動所有工作程序。
php artisan queue:restart
此命令將指示所有佇列工作程序在完成處理目前任務後正常退出,以便不會遺失任何現有任務。由於佇列工作程序將在執行 queue:restart
命令時退出,因此您應該執行進程管理器(例如 Supervisor)以自動重新啟動佇列工作程序。
佇列使用快取來儲存重新啟動訊號,因此在使用此功能之前,您應該驗證您的應用程式是否已正確設定快取驅動程式。
任務過期和超時
任務過期
在您的 config/queue.php
設定檔中,每個佇列連線都定義了一個 retry_after
選項。此選項指定佇列連線在重試正在處理的任務之前應等待的秒數。例如,如果 retry_after
的值設定為 90
,則如果任務已處理 90 秒而未釋放或刪除,則該任務將被釋放回佇列。通常,您應該將 retry_after
的值設定為您的任務合理完成處理所需的最大秒數。
唯一不包含 retry_after
值的佇列連線是 Amazon SQS。SQS 將根據 AWS 控制台中管理的預設可見性逾時來重試任務。
工作者逾時
queue:work
Artisan 命令公開了一個 --timeout
選項。預設情況下,--timeout
值為 60 秒。如果任務的處理時間超過逾時值指定的秒數,則處理該任務的工作者將會錯誤退出。通常,工作者將會由您伺服器上設定的程序管理員自動重新啟動。
php artisan queue:work --timeout=60
retry_after
設定選項和 --timeout
CLI 選項是不同的,但它們共同確保任務不會遺失,並且任務只會成功處理一次。
--timeout
的值應始終比您的 retry_after
設定值至少短幾秒。這將確保在重試任務之前,處理凍結任務的工作者始終被終止。如果您的 --timeout
選項比您的 retry_after
設定值長,您的任務可能會被處理兩次。
Supervisor 設定
在生產環境中,您需要一種方法來保持您的 queue:work
程序運行。queue:work
程序可能會因為多種原因停止運行,例如超過工作者逾時或執行 queue:restart
命令。
因此,您需要設定一個程序監控器,它可以偵測到您的 queue:work
程序何時退出並自動重新啟動它們。此外,程序監控器還可以讓您指定您想要同時運行多少個 queue:work
程序。Supervisor 是 Linux 環境中常用的程序監控器,我們將在以下文件中討論如何設定它。
安裝 Supervisor
Supervisor 是 Linux 作業系統的程序監控器,如果您的 queue:work
程序失敗,它將自動重新啟動它們。要在 Ubuntu 上安裝 Supervisor,您可以使用以下命令
sudo apt-get install supervisor
如果自己設定和管理 Supervisor 聽起來令人生畏,請考慮使用 Laravel Forge,它會為您的生產 Laravel 專案自動安裝和設定 Supervisor。
設定 Supervisor
Supervisor 設定檔通常儲存在 /etc/supervisor/conf.d
目錄中。在此目錄中,您可以建立任意數量的設定檔,這些設定檔會指示 supervisor 如何監控您的程序。例如,讓我們建立一個 laravel-worker.conf
檔案,該檔案會啟動並監控 queue:work
程序
[program:laravel-worker]process_name=%(program_name)s_%(process_num)02dcommand=php /home/forge/app.com/artisan queue:work sqs --sleep=3 --tries=3 --max-time=3600autostart=trueautorestart=truestopasgroup=truekillasgroup=trueuser=forgenumprocs=8redirect_stderr=truestdout_logfile=/home/forge/app.com/worker.logstopwaitsecs=3600
在此範例中,numprocs
指令將指示 Supervisor 運行八個 queue:work
程序並監控所有這些程序,如果它們失敗,則會自動重新啟動它們。您應該更改設定的 command
指令,以反映您想要的佇列連線和工作者選項。
您應該確保 stopwaitsecs
的值大於您運行時間最長的任務所消耗的秒數。否則,Supervisor 可能會在任務完成處理之前將其終止。
啟動 Supervisor
建立設定檔後,您可以使用以下命令更新 Supervisor 設定並啟動程序
sudo supervisorctl reread sudo supervisorctl update sudo supervisorctl start "laravel-worker:*"
有關 Supervisor 的更多資訊,請參閱 Supervisor 文件。
處理失敗的任務
有時您的佇列任務會失敗。別擔心,事情並不總是按計劃進行!Laravel 提供了一種方便的方法來指定任務應嘗試的最大次數。在非同步任務超出此嘗試次數後,它將被插入到 failed_jobs
資料庫表格中。同步調度的任務如果失敗,則不會儲存在此表中,並且其例外會立即由應用程式處理。
建立 failed_jobs
表格的遷移通常已經存在於新的 Laravel 應用程式中。但是,如果您的應用程式不包含此表格的遷移,您可以使用 make:queue-failed-table
命令來建立遷移
php artisan make:queue-failed-table php artisan migrate
在運行佇列工作者程序時,您可以使用 queue:work
命令上的 --tries
開關來指定應嘗試任務的最大次數。如果您沒有為 --tries
選項指定值,則任務只會嘗試一次,或依照任務類別的 $tries
屬性指定次數嘗試。
php artisan queue:work redis --tries=3
使用 --backoff
選項,您可以指定 Laravel 在遇到例外狀況後應等待多少秒才能重試任務。預設情況下,任務會立即釋放回佇列,以便可以再次嘗試
php artisan queue:work redis --tries=3 --backoff=3
如果您想要設定 Laravel 在每個任務的基礎上,在遇到例外狀況後應等待多少秒才能重試任務,您可以在您的任務類別上定義 backoff
屬性
/** * The number of seconds to wait before retrying the job. * * @var int */public $backoff = 3;
如果您需要更複雜的邏輯來決定任務的回退時間,您可以在您的任務類別上定義 backoff
方法
/*** Calculate the number of seconds to wait before retrying the job.*/public function backoff(): int{ return 3;}
您可以透過從 backoff
方法傳回回退值陣列來輕鬆設定「指數」回退。在此範例中,第一次重試的重試延遲時間為 1 秒,第二次重試為 5 秒,第三次重試為 10 秒,如果還有更多嘗試次數,則每次後續重試均為 10 秒
/*** Calculate the number of seconds to wait before retrying the job.** @return array<int, int>*/public function backoff(): array{ return [1, 5, 10];}
清除失敗的任務
當特定任務失敗時,您可能想要向您的使用者發送警報或還原任務部分完成的任何操作。若要完成此操作,您可以在您的任務類別上定義 failed
方法。導致任務失敗的 Throwable
實例將傳遞給 failed
方法
<?php namespace App\Jobs; use App\Models\Podcast;use App\Services\AudioProcessor;use Illuminate\Contracts\Queue\ShouldQueue;use Illuminate\Foundation\Queue\Queueable;use Throwable; class ProcessPodcast implements ShouldQueue{ use Queueable; /** * Create a new job instance. */ public function __construct( public Podcast $podcast, ) {} /** * Execute the job. */ public function handle(AudioProcessor $processor): void { // Process uploaded podcast... } /** * Handle a job failure. */ public function failed(?Throwable $exception): void { // Send user notification of failure, etc... }}
在調用 failed
方法之前,會實例化任務的新實例;因此,handle
方法中可能發生的任何類別屬性修改都將會遺失。
重試失敗的任務
若要檢視已插入 failed_jobs
資料庫表格中的所有失敗任務,您可以使用 queue:failed
Artisan 命令
php artisan queue:failed
queue:failed
命令將列出任務 ID、連線、佇列、失敗時間以及有關任務的其他資訊。任務 ID 可用於重試失敗的任務。例如,若要重試 ID 為 ce7bb17c-cdd8-41f0-a8ec-7b4fef4e5ece
的失敗任務,請發出以下命令
php artisan queue:retry ce7bb17c-cdd8-41f0-a8ec-7b4fef4e5ece
如有必要,您可以將多個 ID 傳遞給命令
php artisan queue:retry ce7bb17c-cdd8-41f0-a8ec-7b4fef4e5ece 91401d2c-0784-4f43-824c-34f94a33c24d
您也可以重試特定佇列的所有失敗任務
php artisan queue:retry --queue=name
若要重試所有失敗的任務,請執行 queue:retry
命令並傳遞 all
作為 ID
php artisan queue:retry all
如果您想要刪除失敗的任務,可以使用 queue:forget
命令
php artisan queue:forget 91401d2c-0784-4f43-824c-34f94a33c24d
當使用 Horizon 時,您應該使用 horizon:forget
命令刪除失敗的任務,而不是 queue:forget
命令。
若要從 failed_jobs
表格中刪除所有失敗的任務,您可以使用 queue:flush
命令
php artisan queue:flush
忽略遺失的模型
當將 Eloquent 模型注入到任務中時,該模型會在放入佇列之前自動序列化,並在處理任務時從資料庫中重新擷取。但是,如果模型在等待工作者處理任務時已刪除,則您的任務可能會因 ModelNotFoundException
而失敗。
為了方便起見,您可以選擇透過將任務的 deleteWhenMissingModels
屬性設定為 true
,自動刪除缺少模型的任務。當此屬性設定為 true
時,Laravel 將會靜默地捨棄任務,而不會引發例外狀況
/** * Delete the job if its models no longer exist. * * @var bool */public $deleteWhenMissingModels = true;
修剪失敗的任務
您可以透過調用 queue:prune-failed
Artisan 命令來修剪應用程式的 failed_jobs
表格中的記錄
php artisan queue:prune-failed
預設情況下,所有超過 24 小時的失敗任務記錄都將被修剪。如果您向命令提供 --hours
選項,則只會保留在過去 N 小時內插入的失敗任務記錄。例如,以下命令將刪除所有在 48 小時前插入的失敗任務記錄
php artisan queue:prune-failed --hours=48
在 DynamoDB 中儲存失敗的任務
Laravel 也支援將您的失敗任務記錄儲存在 DynamoDB 中,而不是關聯式資料庫表格。但是,您必須手動建立 DynamoDB 表格來儲存所有失敗的任務記錄。通常,此表格應命名為 failed_jobs
,但您應該根據應用程式的 queue
設定檔中 queue.failed.table
設定值來命名表格。
failed_jobs
表格應具有名為 application
的字串主分割鍵和名為 uuid
的字串主排序鍵。金鑰的 application
部分將包含您的應用程式名稱,如應用程式 app
設定檔中 name
設定值所定義。由於應用程式名稱是 DynamoDB 表格金鑰的一部分,因此您可以使用相同的表格來儲存多個 Laravel 應用程式的失敗任務。
此外,請確保您已安裝 AWS SDK,以便您的 Laravel 應用程式可以與 Amazon DynamoDB 通訊
composer require aws/aws-sdk-php
接下來,將 queue.failed.driver
設定選項的值設定為 dynamodb
。此外,您應該在失敗任務設定陣列中定義 key
、secret
和 region
設定選項。這些選項將用於向 AWS 驗證。使用 dynamodb
驅動程式時,queue.failed.database
設定選項是不必要的
'failed' => [ 'driver' => env('QUEUE_FAILED_DRIVER', 'dynamodb'), 'key' => env('AWS_ACCESS_KEY_ID'), 'secret' => env('AWS_SECRET_ACCESS_KEY'), 'region' => env('AWS_DEFAULT_REGION', 'us-east-1'), 'table' => 'failed_jobs',],
停用失敗的任務儲存
您可以透過將 queue.failed.driver
設定選項的值設定為 null
,指示 Laravel 捨棄失敗的任務而不儲存它們。通常,這可以透過 QUEUE_FAILED_DRIVER
環境變數來完成
QUEUE_FAILED_DRIVER=null
失敗的任務事件
如果您想要註冊一個在任務失敗時將會調用的事件監聽器,您可以使用 Queue
facade 的 failing
方法。例如,我們可以從 Laravel 包含的 AppServiceProvider
的 boot
方法將閉包附加到此事件
<?php namespace App\Providers; use Illuminate\Support\Facades\Queue;use Illuminate\Support\ServiceProvider;use Illuminate\Queue\Events\JobFailed; class AppServiceProvider extends ServiceProvider{ /** * Register any application services. */ public function register(): void { // ... } /** * Bootstrap any application services. */ public function boot(): void { Queue::failing(function (JobFailed $event) { // $event->connectionName // $event->job // $event->exception }); }}
從佇列清除任務
當使用 Horizon 時,您應該使用 horizon:clear
命令從佇列中清除任務,而不是 queue:clear
命令。
如果您想要從預設連線的預設佇列中刪除所有任務,您可以使用 queue:clear
Artisan 命令來完成
php artisan queue:clear
您也可以提供 connection
引數和 queue
選項,以從特定連線和佇列中刪除任務
php artisan queue:clear redis --queue=emails
從佇列中清除任務僅適用於 SQS、Redis 和資料庫佇列驅動程式。此外,SQS 訊息刪除過程最多需要 60 秒,因此在您清除佇列後最多 60 秒內傳送到 SQS 佇列的任務也可能會被刪除。
監控您的佇列
如果您的佇列收到大量的任務湧入,它可能會不堪負荷,導致任務完成的等待時間過長。如果您願意,當您的佇列任務計數超過指定閾值時,Laravel 可以發出警報通知您。
若要開始使用,您應該排程 queue:monitor
命令每分鐘運行一次。此命令接受您希望監控的佇列名稱以及您想要的任務計數閾值
php artisan queue:monitor redis:default,redis:deployments --max=100
單獨排程這個指令不足以觸發通知,警告您佇列已過載的狀態。當指令遇到工作數量超過您設定閾值的佇列時,將會發送一個 Illuminate\Queue\Events\QueueBusy
事件。您可以在應用程式的 AppServiceProvider
中監聽此事件,以便向您或您的開發團隊發送通知。
use App\Notifications\QueueHasLongWaitTime;use Illuminate\Queue\Events\QueueBusy;use Illuminate\Support\Facades\Event;use Illuminate\Support\Facades\Notification; /** * Bootstrap any application services. */public function boot(): void{ Event::listen(function (QueueBusy $event) { ->notify(new QueueHasLongWaitTime( $event->connection, $event->queue, $event->size )); });}
測試
在測試發送工作的程式碼時,您可能希望指示 Laravel 不要實際執行工作本身,因為工作的程式碼可以直接且獨立於發送它的程式碼進行測試。當然,要測試工作本身,您可以實例化一個工作實例,並在您的測試中直接調用 handle
方法。
您可以使用 Queue
facade 的 fake
方法來防止將排隊的工作實際推送到佇列。在調用 Queue
facade 的 fake
方法之後,您可以斷言應用程式嘗試將工作推送到佇列。
<?php use App\Jobs\AnotherJob;use App\Jobs\FinalJob;use App\Jobs\ShipOrder;use Illuminate\Support\Facades\Queue; test('orders can be shipped', function () { Queue::fake(); // Perform order shipping... // Assert that no jobs were pushed... Queue::assertNothingPushed(); // Assert a job was pushed to a given queue... Queue::assertPushedOn('queue-name', ShipOrder::class); // Assert a job was pushed twice... Queue::assertPushed(ShipOrder::class, 2); // Assert a job was not pushed... Queue::assertNotPushed(AnotherJob::class); // Assert that a Closure was pushed to the queue... Queue::assertClosurePushed(); // Assert the total number of jobs that were pushed... Queue::assertCount(3);});
<?php namespace Tests\Feature; use App\Jobs\AnotherJob;use App\Jobs\FinalJob;use App\Jobs\ShipOrder;use Illuminate\Support\Facades\Queue;use Tests\TestCase; class ExampleTest extends TestCase{ public function test_orders_can_be_shipped(): void { Queue::fake(); // Perform order shipping... // Assert that no jobs were pushed... Queue::assertNothingPushed(); // Assert a job was pushed to a given queue... Queue::assertPushedOn('queue-name', ShipOrder::class); // Assert a job was pushed twice... Queue::assertPushed(ShipOrder::class, 2); // Assert a job was not pushed... Queue::assertNotPushed(AnotherJob::class); // Assert that a Closure was pushed to the queue... Queue::assertClosurePushed(); // Assert the total number of jobs that were pushed... Queue::assertCount(3); }}
您可以傳遞一個閉包給 assertPushed
或 assertNotPushed
方法,以斷言是否推送了一個通過給定「真值測試」的工作。如果至少有一個工作通過了給定的真值測試,則斷言將成功。
Queue::assertPushed(function (ShipOrder $job) use ($order) { return $job->order->id === $order->id;});
模擬部分任務
如果您只需要偽造特定工作,同時允許其他工作正常執行,您可以將應該偽造的工作的類別名稱傳遞給 fake
方法。
test('orders can be shipped', function () { Queue::fake([ ShipOrder::class, ]); // Perform order shipping... // Assert a job was pushed twice... Queue::assertPushed(ShipOrder::class, 2);});
public function test_orders_can_be_shipped(): void{ Queue::fake([ ShipOrder::class, ]); // Perform order shipping... // Assert a job was pushed twice... Queue::assertPushed(ShipOrder::class, 2);}
您可以使用 except
方法來偽造除一組指定工作之外的所有工作。
Queue::fake()->except([ ShipOrder::class,]);
測試任務鏈
要測試工作鏈,您需要利用 Bus
facade 的偽造功能。可以使用 Bus
facade 的 assertChained
方法來斷言已發送一個工作鏈。 assertChained
方法接受一個鏈式工作的陣列作為其第一個參數。
use App\Jobs\RecordShipment;use App\Jobs\ShipOrder;use App\Jobs\UpdateInventory;use Illuminate\Support\Facades\Bus; Bus::fake(); // ... Bus::assertChained([ ShipOrder::class, RecordShipment::class, UpdateInventory::class]);
正如您在上面的範例中所見,鏈式工作的陣列可以是工作類別名稱的陣列。但是,您也可以提供實際工作實例的陣列。這樣做時,Laravel 將確保工作實例屬於同一類別,並且具有與您的應用程式發送的鏈式工作相同的屬性值。
Bus::assertChained([ new ShipOrder, new RecordShipment, new UpdateInventory,]);
您可以使用 assertDispatchedWithoutChain
方法來斷言已推送一個沒有工作鏈的工作。
Bus::assertDispatchedWithoutChain(ShipOrder::class);
測試鏈式修改
如果一個鏈式工作將工作附加或前置到現有鏈,您可以使用該工作的 assertHasChain
方法來斷言該工作具有預期的剩餘工作鏈。
$job = new ProcessPodcast; $job->handle(); $job->assertHasChain([ new TranscribePodcast, new OptimizePodcast, new ReleasePodcast,]);
可以使用 assertDoesntHaveChain
方法來斷言該工作的剩餘鏈為空。
$job->assertDoesntHaveChain();
測試鏈式批次
如果您的工作鏈包含一批工作,您可以在鏈斷言中插入一個 Bus::chainedBatch
定義,以斷言鏈式批次符合您的預期。
use App\Jobs\ShipOrder;use App\Jobs\UpdateInventory;use Illuminate\Bus\PendingBatch;use Illuminate\Support\Facades\Bus; Bus::assertChained([ new ShipOrder, Bus::chainedBatch(function (PendingBatch $batch) { return $batch->jobs->count() === 3; }), new UpdateInventory,]);
測試任務批次
可以使用 Bus
facade 的 assertBatched
方法來斷言已發送一個工作批次。傳遞給 assertBatched
方法的閉包會接收一個 Illuminate\Bus\PendingBatch
的實例,可用於檢查批次中的工作。
use Illuminate\Bus\PendingBatch;use Illuminate\Support\Facades\Bus; Bus::fake(); // ... Bus::assertBatched(function (PendingBatch $batch) { return $batch->name == 'import-csv' && $batch->jobs->count() === 10;});
您可以使用 assertBatchCount
方法來斷言已發送給定數量的批次。
Bus::assertBatchCount(3);
您可以使用 assertNothingBatched
來斷言沒有發送任何批次。
Bus::assertNothingBatched();
測試工作 / 批次互動
此外,您可能偶爾需要測試單個工作與其底層批次的互動。例如,您可能需要測試工作是否取消了其批次的進一步處理。為此,您需要透過 withFakeBatch
方法將偽造批次分配給工作。 withFakeBatch
方法會傳回一個包含工作實例和偽造批次的元組。
[$job, $batch] = (new ShipOrder)->withFakeBatch(); $job->handle(); $this->assertTrue($batch->cancelled());$this->assertEmpty($batch->added);
測試任務 / 佇列互動
有時,您可能需要測試排隊的工作將自己釋放回佇列。或者,您可能需要測試該工作是否已刪除自己。您可以透過實例化工作並調用 withFakeQueueInteractions
方法來測試這些佇列互動。
一旦工作的佇列互動被偽造,您就可以調用工作的 handle
方法。在調用工作之後,可以使用 assertReleased
、assertDeleted
、assertNotDeleted
、assertFailed
和 assertNotFailed
方法來對工作的佇列互動進行斷言。
use App\Jobs\ProcessPodcast; $job = (new ProcessPodcast)->withFakeQueueInteractions(); $job->handle(); $job->assertReleased(delay: 30);$job->assertDeleted();$job->assertNotDeleted();$job->assertFailed();$job->assertNotFailed();
任務事件
使用 Queue
facade 上的 before
和 after
方法,您可以指定在處理排隊工作之前或之後執行的回呼。這些回呼是執行額外記錄或增加儀表板統計資料的好機會。通常,您應該從服務提供者的 boot
方法中調用這些方法。例如,我們可以使用 Laravel 隨附的 AppServiceProvider
。
<?php namespace App\Providers; use Illuminate\Support\Facades\Queue;use Illuminate\Support\ServiceProvider;use Illuminate\Queue\Events\JobProcessed;use Illuminate\Queue\Events\JobProcessing; class AppServiceProvider extends ServiceProvider{ /** * Register any application services. */ public function register(): void { // ... } /** * Bootstrap any application services. */ public function boot(): void { Queue::before(function (JobProcessing $event) { // $event->connectionName // $event->job // $event->job->payload() }); Queue::after(function (JobProcessed $event) { // $event->connectionName // $event->job // $event->job->payload() }); }}
使用 Queue
facade 上的 looping
方法,您可以指定在 worker 嘗試從佇列中提取工作之前執行的回呼。例如,您可能會註冊一個閉包來回滾先前失敗的工作所留下的任何開啟的交易。
use Illuminate\Support\Facades\DB;use Illuminate\Support\Facades\Queue; Queue::looping(function () { while (DB::transactionLevel() > 0) { DB::rollBack(); }});