隊列的基本功能: 1、立即執行;yes 2、延遲執行;yes 3、保證至少執行一次;yes 4、必須執行且最多執行一次;no 用到的數據結構: list、Sorted sets 延遲執行的機制: 1、先把數據放入SortedSets類型的queues:queue_000:delayed中 2、在執行 ...
隊列的基本功能:
1、立即執行;yes
2、延遲執行;yes
3、保證至少執行一次;yes
4、必須執行且最多執行一次;no
用到的數據結構:
list、Sorted sets
延遲執行的機制:
1、先把數據放入SortedSets類型的queues:queue_000:delayed中
2、在執行pop的時候,執行lua腳本,把SortedSets類型的queues:queue_000:delayed 中可以執行的數據rpush到list類型的queues:queue_000中
保證執行成功的機制:
1、把要執行的數據先放入SortedSets類型的queues:queue_000:reserved中
2、在執行pop的時候,執行lua腳本,把SortedSets類型的queues:queue_000:reserved 中可以執行的數據rpush到list類型的queues:queue_000中
3、任務執行成功,從SortedSets類型的queues:queue_000:reserved中執行刪除預存的數據
class RedisQueue extends Queue implements QueueContract { public function pushRaw($payload, $queue = null, array $options = []) { $this->getConnection()->rpush( $this->getQueue($queue), // list類型的queues:queue_000 $payload // $payload === "標準化後的數據,進行json格式化"的數據 ); return json_decode($payload, true)['id'] ?? null; } protected function laterRaw($delay, $payload, $queue = null) { $this->getConnection()->zadd( $this->getQueue($queue).':delayed', // SortedSets類型的queues:queue_000:delayed $this->availableAt($delay), // 延遲執行 $payload // $payload === "標準化後的數據,進行json格式化"的數據 ); return json_decode($payload, true)['id'] ?? null; } public function pop($queue = null) { // 執行lua腳本,把SortedSets類型的queues:queue_000:delayed 中可以執行的數據rpush到list類型的queues:queue_000中 // 執行lua腳本,把SortedSets類型的queues:queue_000:reserved 中可以執行的數據rpush到list類型的queues:queue_000中 $this->migrate($prefixed = $this->getQueue($queue)); // 執行lua腳本,從list類型的queues:queue_000中lpop出數據,attempts加1,然後設定超時時間並放入結構把SortedSets類型的queues:queue_000:reserved 中 list($job, $reserved) = $this->retrieveNextJob($prefixed); if ($reserved) { return new RedisJob( $this->container, $this, $job, $reserved, $this->connectionName, $queue ?: $this->default ); } } }