這篇文章給大家分享的內容是關於Swoft 源碼剖析之Swoole和Swoft的一些介紹(Task投遞/定時任務篇),有一定的參考價值,有需要的朋友可以參考一下。 前言 Swoft的任務功能基於Swoole的Task機制,或者說Swoft的Task機制本質就是對Swoole的Task機制的封裝和加強。 ...
這篇文章給大家分享的內容是關於Swoft 源碼剖析之Swoole和Swoft的一些介紹(Task投遞/定時任務篇),有一定的參考價值,有需要的朋友可以參考一下。
前言
Swoft
的任務功能基於Swoole
的Task機制
,或者說Swoft
的Task
機制本質就是對Swoole
的Task機制
的封裝和加強。
任務投遞
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
|
class Task
{
public static function deliver(string $taskName , string $methodName , array $params = [], string $type = self::TYPE_CO, $timeout = 3)
{
$data = TaskHelper::pack( $taskName , $methodName , $params , $type );
if (!App::isWorkerStatus() && !App::isCoContext()){
return self::deliverByQueue( $data );
}
if (!App::isWorkerStatus() && App::isCoContext()){
throw new TaskException( 'Please deliver task by http!' );
}
$server = App:: $server ->getServer();
if ( $type == self::TYPE_CO) {
$tasks [0] = $data ;
$prifleKey = 'task' . '.' . $taskName . '.' . $methodName ;
App::profileStart( $prifleKey );
$result = $server ->taskCo( $tasks , $timeout );
App::profileEnd( $prifleKey );
return $result ;
}
return $server ->task( $data );
}
}
|
任務投遞Task::deliver()
將調用參數打包後根據$type
參數通過Swoole
的$server->taskCo()
或$server->task()
介面投遞到Task進程
。
Task
本身始終是同步執行的,$type
僅僅影響投遞這一操作的行為,Task::TYPE_ASYNC
對應的$server->task()
是非同步投遞,Task::deliver()
調用後馬上返回;Task::TYPE_CO
對應的$server->taskCo()
是協程投遞,投遞後讓出協程式控制制,任務完成或執行超時後Task::deliver()
才從協程返回。
任務執行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
class TaskEventListener implements TaskInterface, FinishInterface
{
public function onTask(Server $server , int $taskId , int $workerId , $data )
{
try {
$taskExecutor = App::getBean(TaskExecutor:: class );
$result = $taskExecutor ->run( $data );
} catch (\Throwable $throwable ) {
App::error(sprintf( 'TaskExecutor->run %s file=%s line=%d ' , $throwable ->getMessage(), $throwable ->getFile(), $throwable ->getLine()));
$result = false;
App::trigger(AppEvent::RESOURCE_RELEASE);
App::trigger(TaskEvent::AFTER_TASK);
}
return $result ;
}
}
|
此處是swoole.onTask
的事件回調,其職責僅僅是將將Worker
進程投遞來的打包後的數據轉發給TaskExecutor
。
Swoole
的Task
機制的本質是Worker進程
將耗時任務投遞給同步的Task進程
(又名TaskWorker
)處理,所以swoole.onTask
的事件回調是在Task進程
中執行的。上文說過,Worker進程
是你大部分HTTP
服務代碼執行的環境,但是從TaskEventListener.onTask()
方法開始,代碼的執行環境都是Task進程
,也就是說,TaskExecutor
和具體的TaskBean
都是執行在Task進程
中的。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
class TaskExecutor
{
public function run(string $data )
{
$data = TaskHelper::unpack( $data );
$name = $data [ 'name' ];
$type = $data [ 'type' ];
$method = $data [ 'method' ];
$params = $data [ 'params' ];
$logid = $data [ 'logid' ] ?? uniqid( '' , true);
$spanid = $data [ 'spanid' ] ?? 0;
$collector = TaskCollector::getCollector();
if (!isset( $collector [ 'task' ][ $name ])) {
return false;
}
list(, $coroutine ) = $collector [ 'task' ][ $name ];
$task = App::getBean( $name );
if ( $coroutine ) {
$result = $this ->runCoTask( $task , $method , $params , $logid , $spanid , $name , $type );
} else {
$result = $this ->runSyncTask( $task , $method , $params , $logid , $spanid , $name , $type );
}
return $result ;
}
}
|
任務執行思路很簡單,將Worker進程
發過來的數據解包還原成原來的調用參數,根據$name
參數找到對應的TaskBean
並調用其對應的task()
方法。其中TaskBean
使用類級別註解@Task(name="TaskName")
或者@Task("TaskName")
聲明。
值得一提的一點是,@Task
註解除了name
屬性,還有一個coroutine
屬性,上述代碼會根據該參數選擇使用協程的runCoTask()
或者同步的runSyncTask()
執行Task
。但是由於而且由於Swoole
的Task進程
的執行是完全同步的,不支持協程,所以目前版本請該參數不要配置為true
。同樣的在TaskBean
中編寫的任務代碼必須的同步阻塞的或者是要能根據環境自動將非同步非阻塞和協程降級為同步阻塞的
從Process中投遞任務
前面我們提到:
Swoole
的Task
機制的本質是Worker進程
將耗時任務投遞給同步的Task進程
(又名 TaskWorker
)處理。
換句話說,Swoole
的$server->taskCo()
或$server->task()
都只能在Worker進程
中使用。
這個限制大大的限制了使用場景。 如何能夠為了能夠在Process
中投遞任務呢?Swoft
為了繞過這個限制提供了Task::deliverByProcess()
方法。其實現原理也很簡單,通過Swoole
的$server->sendMessage()
方法將調用信息從Process
中投遞到Worker進程
中,然後由Worker進程替其投遞到Task進程
當中,相關代碼如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
public static function deliverByProcess(string $taskName , string $methodName , array $params = [], int $timeout = 3, int $workId = 0, string $type = self::TYPE_ASYNC): bool
{
$server = App:: $server ->getServer();
$pipeMessage = App::getBean(PipeMessage:: class );
$data = [
'name' => $taskName ,
'method' => $methodName ,
'params' => $params ,
'timeout' => $timeout ,
'type' => $type ,
];
$message = $pipeMessage ->pack(PipeMessage::MESSAGE_TYPE_TASK, $data );
return $server ->sendMessage( $message , $workId );
}
|
數據打包後使用$server->sendMessage()
投遞給Worker
:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
public function onPipeMessage(Server $server , int $srcWorkerId , string $message )
{
$pipeMessage = App::getBean(PipeMessage:: class );
list( $type , $data ) = $pipeMessage ->unpack( $message );
App::trigger(AppEvent::PIPE_MESSAGE, null, $type , $data , $srcWorkerId );
}
|
$server->sendMessage
後,Worker進程
收到數據時會觸發一個swoole.pipeMessage
事件的回調,Swoft
會將其轉換成自己的swoft.pipeMessage
事件並觸發.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
class PipeMessageListener implements EventHandlerInterface
{
public function handle(EventInterface $event )
{
$params = $event ->getParams();
if ( count ( $params ) < 3) {
return ;
}
list( $type , $data , $srcWorkerId ) = $params ;
if ( $type != PipeMessage::MESSAGE_TYPE_TASK) {
return ;
}
$type = $data [ 'type' ];
$taskName = $data [ 'name' ];
$params = $data [ 'params' ];
$timeout = $data [ 'timeout' ];
$methodName = $data [ 'method' ];
Task::deliver( $taskName , $methodName , $params , $type , $timeout );
}
}
|
swoft.pipeMessage
事件最終由PipeMessageListener
處理。在相關的監聽其中,如果發現swoft.pipeMessage
事件由Task::deliverByProcess()
產生的,Worker進程
會替其執行一次Task::deliver()
,最終將任務數據投遞到TaskWorker進程
中。
一道簡單的回顧練習:從Task::deliverByProcess()
到某TaskBean
最終執行任務,經歷了哪些進程,而調用鏈的哪些部分又分別是在哪些進程中執行?
從Command進程或其子進程中投遞任務
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
public function deliver(string $data , int $taskWorkerId = null, $srcWorkerId = null)
{
if ( $taskWorkerId === null) {
$taskWorkerId = mt_rand( $this ->workerNum + 1, $this ->workerNum + $this ->taskNum);
}
if ( $srcWorkerId === null) {
$srcWorkerId = mt_rand(0, $this ->workerNum - 1);
}
$this ->check();
$data = $this ->pack( $data , $srcWorkerId );
$result = \msg_send( $this ->queueId, $taskWorkerId , $data , false);
if (! $result ) {
return false;
}
return true;
}
|
對於Command
進程的任務投遞,情況會更複雜一點。
上文提到的Process
,其往往衍生於Http/Rpc
服務,作為同一個Manager
的子孫進程,他們能夠拿到Swoole\Server
的句柄變數,從而通過$server->sendMessage()
,$server->task()
等方法進行任務投遞。
但在Swoft
的體系中,還有一個十分路人的角色: Command
。
Command
的進程從shell
或cronb
獨立啟動,和Http/Rpc
服務相關的進程沒有親緣關係。因此Command
進程以及從Command
中啟動的Process
進程是沒有辦法拿到Swoole\Server
的調用句柄直接通過UnixSocket
進行任務投遞的。
為了為這種進程提供任務投遞支持,Swoft
利用了Swoole
的Task進程
的一個特殊功能----消息隊列。
同一個項目中Command
和Http\RpcServer
通過約定一個message_queue_key
獲取到系統內核中的同一條消息隊列,然後Comand
進程就可以通過該消息隊列向Task進程
投遞任務了。
該機制沒有提供對外的公開方法,僅僅被包含在Task::deliver()
方法中,Swoft
會根據當前環境隱式切換投遞方式。但該消息隊列的實現依賴Semaphore
拓展,如果你想使用,需要在編譯PHP
時加上--enable-sysvmsg
參數。
定時任務
除了手動執行的普通任務,Swoft
還提供了精度為秒的定時任務功能用來在項目中替代Linux的Crontab
功能.
Swoft
用兩個前置Process
---任務計划進程:CronTimerProcess
和任務執行進程CronExecProcess
,和兩張記憶體數據表-----RunTimeTable
(任務(配置)表)OriginTable
((任務)執行表)用於定時任務的管理調度。
兩張表的每行記錄的結構如下:
1
2
3
4
您的分享是我們最大的動力!
-Advertisement-
更多相關文章
-
在MySLQ中 UPDATA 和 INSERT 數據的時候,如果數據上面帶有emoji圖標,例如:?、?、? 很容易更新或者插入不成功,導致報錯。 1 2 Error: ER_TRUNCATED_WRONG_VALUE_FOR_FIELD: Incorrect string value: '\xF0 ...
-
1. 初識文件操作 使⽤python來讀寫⽂件是非常簡單的操作。我們使⽤open()函數來打開⼀個⽂件,獲取到⽂ 件句柄,然後通過⽂件句柄就可以進⾏各種各樣的操作了,根據打開⽅式的不同能夠執⾏的操 作也會有相應的差異。 打開⽂件的⽅式: r, w, a, r+, w+, a+, rb, wb, ab ...
-
python學習之路-01 x = 1+2+3+4 print(x) print(x*5) y = x*5 print(y+100-45+2) print('泰哥泰哥,我是小弟') print('泰哥泰哥,我是三弟小妹') t-t = 2 3t_t = 23 *r = 4 _ = 'fdsa' __ ...
-
這篇文章主要介紹了關於如何使用swoole加速laravel,有著一定的參考價值,現在分享給大家,有需要的朋友可以參考一下 再來複習一下吧,導致 php 慢的各種因素中解析性語言的特性可以說是罪魁禍首,再加上,每次請求完都釋放請求時所載入的全部文件,因此也就顯得更慢了。 後來我們有了 opcache ...
-
本篇文章給大家帶來的內容是關於php為什麼需要非同步編程?php非同步編程的詳解(附示例),有一定的參考價值,有需要的朋友可以參考一下,希望對你有所幫助。 我對 php 非同步的知識還比較混亂,寫這篇是為了整理,可能有錯。 傳統的 php-fpm 一個進程執行一個請求,要達到多少併發,就要生成多少個進程。 ...
-
最近兩個月一直在研究 Swoole,那麼藉助這篇文章,我希望能夠把 Swoole 安利給更多人。雖然 Swoole 可能目前定位是一些高級 phper 的玩具,讓中低級望而生畏,可能對一些應用場景也一臉懵逼,但其實沒這麼難的。 在 Swoole 官網的自我介紹是“面向生產環境的 PHP 非同步網路通信 ...
-
這篇文章僅僅只實現一個 Redis 連接池,篇幅就太少了,順便將前幾篇整合一下。Demo 中大概包含這些點: 實現 MySQL 連接池 實現 MySQL CURD 方法的定義 實現 Redis 連接池 實現 Redis 方法的定義 滿足 HTTP、TCP、WebSocket 調用 提供 Demo 供 ...
-
下載ProcessExplorer ProcessExplorer下載地址:ProcessExplorer 下載運行代碼 打開ProcessExplorer 查看javaw.exe的pid jstack pid號 > G:\1.log(保存路徑) 查看tid javaw.exe右鍵屬性-》點Thre ...
一周排行
-Advertisement-
-
移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
-
前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
-
項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
-
話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
-
前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
-
Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
-
目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
-
1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
-
本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
-
通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...
|