當創建隊列jobs、監聽器或訂閱伺服器以推送到隊列中時,您可能會開始認為,一旦分派,隊列工作器決定如何處理您的邏輯就完全由您自己決定了。 嗯……並不是說你不能從作業內部與隊列工作器交互,但是通常情況下,哪怕你做了,也是沒必要的。 這個神奇的騷操作的出現是因為“InteractsWithQueue”這 ...
當創建隊列jobs、監聽器或訂閱伺服器以推送到隊列中時,您可能會開始認為,一旦分派,隊列工作器決定如何處理您的邏輯就完全由您自己決定了。
嗯……並不是說你不能從作業內部與隊列工作器交互,但是通常情況下,哪怕你做了,也是沒必要的。
這個神奇的騷操作的出現是因為“InteractsWithQueue”這個trait。.當排隊作業正在從隊列中拉出, 這個 [CallQueuedListener](https://github.com/laravel/framework/blob/5.8/src/Illuminate/Events/CallQueuedListener.php#L90-L104) 會檢查它是否在使用 InteractsWithQueue trait, 如果是的話,框架會將底層的“隊列jobs”實例註入到內部。
這個 “任務” 實例類似於一個包裝了真正的 Job 類的驅動,其中包含隊列連接和嘗試等信息。
背景
我將以一個轉碼 Job 為例。 這是一個將廣播音頻文件轉換成192kbps MP3格式的任務。因為這是在自由轉碼隊列中設置的,所以它的作用有限。
檢查嘗試次數
attempts()是被調用的第一個方法, 顧名思義,它返回嘗試次數,一個隊列 job總是伴隨著一個attempt啟動。
此方法旨在與其他方法一起使用 ..., 類似 fail() 或者 release() (delay). 為了便於說明,我們將通知用戶第幾次重試: 每次我們嘗試在空閑隊列中轉換(轉換代碼)時,我們都會通知用戶我們正在第幾次重試,讓他可以選擇取消將來的轉換(轉換代碼)。
1 <?php 2 namespace App\Jobs; 3 use App\Podcast; 4 use Transcoder\Transcoder; 5 use Illuminate\Bus\Queueable; 6 use Illuminate\Queue\SerializesModels; 7 use App\Notifications\PodcastTranscoded; 8 use Illuminate\Queue\InteractsWithQueue; 9 use Illuminate\Foundation\Bus\Dispatchable; 10 use App\Notifications\RetyingPodcastTranscode; 11 class TranscodePodcast 12 { 13 use Dispatchable, InteractsWithQueue, Queueable, SerializesModels; 14 /** 15 * Transcoder Instance 16 * 17 * @var \App\Podcast 18 */ 19 protected $podcast; 20 /** 21 * 創建一個新的轉碼podcast實例。 22 * 23 * @param \App\Podcast $podcast 24 * @return void 25 */ 26 public function __construct(Podcast $podcast) 27 { 28 $this->podcast = $podcast; 29 } 30 /** 31 * 執行隊列job. 32 * 33 * @param \Transcoder\Transcoder $podcast 34 * @return void 35 */ 36 public function handle(Transcoder $transcoder) 37 { 38 // 告訴用戶我們第幾次重試 39 if ($this->attempts() > 1) { 40 $this->podcast->publisher->notify(new RetryingPodcastTranscode($this->podcast, $this->attempts()); 41 } 42 $transcoded = $this->transcoder->setFile($event->podcast) 43 ->format('mp3') 44 ->bitrate(192) 45 ->start(); 46 47 48 // 將轉碼podcast與原始podcast關聯 49 $this->podcast->transcode()->associate($transcoded); 50 51 // 通知podcast的發佈者他的podcast已經準備好了 52 53 $this->publisher->notify(new PodcastTranscoded($this->podcast)); 54 } 55 }
告訴用戶我們將在第幾次時重試某些內容,這在邏輯預先失敗時很有用,讓用戶(或開發人員)檢查出了什麼問題,但當然您可以做更多的事情。
就我個人而言,我喜歡在“Job作業”失敗後再這樣做,如果還有重試時間,告訴他我們稍後會重試當然,這個例子只是為了舉例說明。
刪除作業隊列 Job
第二個方法就是 delete(). 跟你猜想的一樣,您可以從隊列中刪除當前的“隊列 Job”。 當隊列 Job或偵聽器由於多種原因排隊後不應處理時,這將會很方便,例如,考慮一下這個場景:在轉碼發生之前,上傳podcast的發佈者由於任何原因(比如TOS衝突)被停用,我們應該不處理podcast。
我們將在前面的示例中添加該代碼:
1 <?php 2 namespace App\Jobs; 3 use App\Podcast; 4 use Transcoder\Transcoder; 5 use Illuminate\Bus\Queueable; 6 use Illuminate\Queue\SerializesModels; 7 use App\Notifications\PodcastTranscoded; 8 use Illuminate\Queue\InteractsWithQueue; 9 use Illuminate\Foundation\Bus\Dispatchable; 10 use App\Notifications\RetyingPodcastTranscode; 11 class TranscodePodcast 12 { 13 use Dispatchable, InteractsWithQueue, Queueable, SerializesModels; 14 /** 15 * Transcoder Instance 16 * 17 * @var \App\Podcast 18 */ 19 protected $podcast; 20 /** 21 * 創建一個新的轉碼podcast實例。 22 * 23 * @param \App\Podcast $podcast 24 * @return void 25 */ 26 public function __construct(Podcast $podcast) 27 { 28 $this->podcast = $podcast; 29 } 30 /** 31 * 執行隊列 job. 32 * 33 * @param \Transcoder\Transcoder $podcast 34 * @return void 35 */ 36 public function handle(Transcoder $transcoder) 37 { 38 // 如果發佈伺服器已被停用,請刪除此隊列job 39 if ($this->podcast->publisher->isDeactivated()) { 40 $this->delete(); 41 } 42 // 告訴用戶我們第幾次重試 43 if ($this->attempts() > 1) { 44 $this->podcast->publisher->notify(new RetryingPodcastTranscode($this->podcast, $this->attempts()); 45 } 46 $transcoded = $this->transcoder->setFile($event->podcast) 47 ->format('mp3') 48 ->bitrate(192) 49 ->start(); 50 51 // 將轉碼podcast與原始podcast關聯 52 $this->podcast->transcode()->associate($transcoded); 53 54 // 通知podcast的發佈者他的podcast已經準備好了 55 $this->publisher->notify(new PodcastTranscoded($this->podcast)); 56 } 57 }
如果需要刪除可能已刪除的模型上的作業,則可能需要 設置 [$deleteWhenMissingModels](https://laravel.com/docs/5.8/queues#ignoring-missing-models) 為真 t避免處理不存在的東西。
失敗的隊列job
當您需要控制人為破壞邏輯時,這非常非常方便, 因為使用空的“return”語句會將“Job”標記為已成功完成。您可以強制使排隊的作業失敗,希望出現異常,允許處理程式在可能的情況下稍後重試。
這使您在作業失敗時可以更好地控制在任何情況下,也可以使用“failed()”方法, 它允許你 失敗後執行任何清潔操縱, 比如通知用戶或者刪除一些東西。
在此示例中,如果由於任何原因(如 CDN 關閉時)無法從存儲中檢索podcast ,則作業將失敗,並引發自定義異常。
1 <?php 2 namespace App\Jobs; 3 use App\Podcast; 4 use Transcoder\Transcoder; 5 use Illuminate\Bus\Queueable; 6 use Illuminate\Queue\SerializesModels; 7 use App\Exceptions\PodcastUnretrievable; 8 use App\Notifications\PodcastTranscoded; 9 use Illuminate\Queue\InteractsWithQueue; 10 use Illuminate\Foundation\Bus\Dispatchable; 11 use App\Notifications\RetyingPodcastTranscode; 12 class TranscodePodcast 13 { 14 use Dispatchable, InteractsWithQueue, Queueable, SerializesModels; 15 /** 16 * 轉碼器實例 17 * 18 * @var \App\Podcast 19 */ 20 protected $podcast; 21 /** 22 * 創建一個新的轉碼Podcast實例。 23 * 24 * @param \App\Podcast $podcast 25 * @return void 26 */ 27 public function __construct(Podcast $podcast) 28 { 29 $this->podcast = $podcast; 30 } 31 /** 32 * 執行隊列 job. 33 * 34 * @param \Transcoder\Transcoder $podcast 35 * @return void 36 */ 37 public function handle(Transcoder $transcoder) 38 { 39 // 如果發佈伺服器已被停用,請刪除此隊列job 40 if ($this->podcast->publisher->isDeactivated()) { 41 $this->delete(); 42 } 43 //如果podcast不能從storage存儲中檢索,我們就會失敗。 44 if ($this->podcast->fileDoesntExists()) { 45 $this->fail(new PodcastUnretrievable($this->podcast)); 46 } 47 // 告訴用戶我們第幾次重試 48 if ($this->attempts() > 1) { 49 $this->podcast->publisher->notify(new RetryingPodcastTranscode($this->podcast, $this->attempts()); 50 } 51 52 $transcoded = $this->transcoder->setFile($event->podcast) 53 ->format('mp3') 54 ->bitrate(192) 55 ->start(); 56 57 // 將轉碼podcast與原始podcast關聯 58 $this->podcast->transcode()->associate($transcoded); 59 60 // 通知podcast的發佈者他的podcast已經準備好了 61 $this->publisher->notify(new PodcastTranscoded($this->podcast)); 62 } 63 }
現在,進入最後的方法。
釋放(延遲)隊列job
這可能是trait性狀的最有用的方法, 因為它可以讓你在未來進一步推動這項隊列job. 此方法用於 隊列job速率限制.
除了速率限制之外,您還可以在某些不可用但希望在不久的將來使用它的情況下使用它同時,避免先發制人地失敗。
在最後一個示例中,我們將延遲轉碼以備稍後使用:如果轉碼器正在大量使用,我們將延遲轉碼5分鐘直到負載降低。
1 <?php 2 namespace App\Jobs; 3 use App\Podcast; 4 use Transcoder\Transcoder; 5 use Illuminate\Bus\Queueable; 6 use Illuminate\Queue\SerializesModels; 7 use App\Exceptions\PodcastUnretrievable; 8 use App\Notifications\PodcastTranscoded; 9 use Illuminate\Queue\InteractsWithQueue; 10 use App\Notifications\TranscoderHighUsage; 11 use Illuminate\Foundation\Bus\Dispatchable; 12 use App\Notifications\RetyingPodcastTranscode; 13 class TranscodePodcast 14 { 15 use Dispatchable, InteractsWithQueue, Queueable, SerializesModels; 16 /** 17 * Transcoder Instance 18 * 19 * @var \App\Podcast 20 */ 21 protected $podcast; 22 /** 23 * 創建一個新的轉碼podcast實例。 24 * 25 * @param \App\Podcast $podcast 26 * @return void 27 */ 28 public function __construct(Podcast $podcast) 29 { 30 $this->podcast = $podcast; 31 } 32 /** 33 * 執行隊列job. 34 * 35 * @param \Transcoder\Transcoder $podcast 36 * @return void 37 */ 38 public function handle(Transcoder $transcoder) 39 { 40 // 如果發佈伺服器已被停用,請刪除此隊列job 41 if ($this->podcast->publisher->isDeactivated()) { 42 $this->delete(); 43 } 44 // 如果podcast不能從storage存儲中檢索,我們就會失敗。 45 if ($this->podcast->fileDoesntExists()) { 46 $this->fail(new PodcastUnretrievable($this->podcast)); 47 } 48 49 // 如果轉碼器使用率很高,我們將 50 // t延遲轉碼5分鐘. 否則我們可能會有拖延轉碼器進程的危險 51 // 它會把所有的轉碼子進程都記錄下來。 52 if ($transcoder->getLoad()->isHigh()) { 53 $delay = 60 * 5; 54 $this->podcast->publisher->notify(new TranscoderHighUsage($this->podcast, $delay)); 55 $this->release($delay); 56 } 57 // 告訴用戶我們第幾次重試 58 if ($this->attempts() > 1) { 59 $this->podcast->publisher->notify(new RetryingPodcastTranscode($this->podcast, $this->attempts()); 60 } 61 62 $transcoded = $this->transcoder->setFile($event->podcast) 63 ->format('mp3') 64 ->bitrate(192) 65 ->start(); 66 67 // 將轉碼podcast與原始podcast關聯 68 $this->podcast->transcode()->associate($transcoded); 69 70 // 通知podcast的發佈者他的podcast已經準備好了 71 $this->publisher->notify(new PodcastTranscoded($this->podcast)); 72 } 73 }
我們可以使用一些特殊方法,例如,獲得分配給轉碼器的一些時隙,如果轉碼器時隙已滿,則延遲作業。 在排隊的工作中,你能做的就只有這些了。排隊愉快。