Swoft源碼之Swoole和Swoft的分析

来源:https://www.cnblogs.com/heyue0117/archive/2019/11/06/11809121.html
-Advertisement-
Play Games

這篇文章給大家分享的內容是關於Swoft 源碼剖析之Swoole和Swoft的一些介紹(Task投遞/定時任務篇),有一定的參考價值,有需要的朋友可以參考一下。 前言 Swoft的任務功能基於Swoole的Task機制,或者說Swoft的Task機制本質就是對Swoole的Task機制的封裝和加強。 ...


這篇文章給大家分享的內容是關於Swoft 源碼剖析之Swoole和Swoft的一些介紹(Task投遞/定時任務篇),有一定的參考價值,有需要的朋友可以參考一下。

 

前言

Swoft的任務功能基於SwooleTask機制,或者說SwoftTask機制本質就是對SwooleTask機制的封裝和加強。

任務投遞

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

//Swoft\Task\Task.php

class Task

{

    /**

     * Deliver coroutine or async task

     *

     * @param string $taskName

     * @param string $methodName

     * @param array  $params

     * @param string $type

     * @param int    $timeout

     *

     * @return bool|array

     * @throws TaskException

     */

    public static function deliver(string $taskName, string $methodName, array $params = [], string $type = self::TYPE_CO, $timeout = 3)

    {

        $data   = TaskHelper::pack($taskName, $methodName, $params, $type);

 

        if(!App::isWorkerStatus() && !App::isCoContext()){

            return self::deliverByQueue($data);//見下文Command章節

        }

 

        if(!App::isWorkerStatus() && App::isCoContext()){

            throw new TaskException('Please deliver task by http!');

        }

 

 

        $server = App::$server->getServer();

        // Delier coroutine task

        if ($type == self::TYPE_CO) {

            $tasks[0]  = $data;

            $prifleKey = 'task' . '.' . $taskName . '.' . $methodName;

 

            App::profileStart($prifleKey);

            $result = $server->taskCo($tasks, $timeout);

            App::profileEnd($prifleKey);

 

            return $result;

        }

 

        // Deliver async task

        return $server->task($data);

    }

}

任務投遞Task::deliver()將調用參數打包後根據$type參數通過Swoole$server->taskCo()$server->task()介面投遞到Task進程
Task本身始終是同步執行的,$type僅僅影響投遞這一操作的行為,Task::TYPE_ASYNC對應的$server->task()是非同步投遞,Task::deliver()調用後馬上返回;Task::TYPE_CO對應的$server->taskCo()是協程投遞,投遞後讓出協程式控制制,任務完成或執行超時後Task::deliver()才從協程返回。

任務執行

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

//Swoft\Task\Bootstrap\Listeners\TaskEventListener

/**

 * The listener of swoole task

 * @SwooleListener({

 *     SwooleEvent::ON_TASK,

 *     SwooleEvent::ON_FINISH,

 * })

 */

class TaskEventListener implements TaskInterface, FinishInterface

{

    /**

     * @param \Swoole\Server $server

     * @param int            $taskId

     * @param int            $workerId

     * @param mixed          $data

     * @return mixed

     * @throws \InvalidArgumentException

     */

    public function onTask(Server $server, int $taskId, int $workerId, $data)

    {

        try {

            /* @var TaskExecutor $taskExecutor*/

            $taskExecutor = App::getBean(TaskExecutor::class);

            $result = $taskExecutor->run($data);

        } catch (\Throwable $throwable) {

            App::error(sprintf('TaskExecutor->run %s file=%s line=%d ', $throwable->getMessage(), $throwable->getFile(), $throwable->getLine()));

            $result = false;

 

            // Release system resources

            App::trigger(AppEvent::RESOURCE_RELEASE);

 

            App::trigger(TaskEvent::AFTER_TASK);

        }

        return $result;

    }

}

此處是swoole.onTask的事件回調,其職責僅僅是將將Worker進程投遞來的打包後的數據轉發給TaskExecutor

SwooleTask機制的本質是Worker進程將耗時任務投遞給同步的Task進程(又名TaskWorker)處理,所以swoole.onTask的事件回調是在Task進程中執行的。上文說過,Worker進程是你大部分HTTP服務代碼執行的環境,但是從TaskEventListener.onTask()方法開始,代碼的執行環境都是Task進程,也就是說,TaskExecutor和具體的TaskBean都是執行在Task進程中的。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

//Swoft\Task\TaskExecutor

/**

 * The task executor

 *

 * @Bean()

 */

class TaskExecutor

{

    /**

     * @param string $data

     * @return mixed

    */

    public function run(string $data)

    {

        $data = TaskHelper::unpack($data);

 

        $name   = $data['name'];

        $type   = $data['type'];

        $method = $data['method'];

        $params = $data['params'];

        $logid  = $data['logid'] ?? uniqid('', true);

        $spanid = $data['spanid'] ?? 0;

 

 

        $collector = TaskCollector::getCollector();

        if (!isset($collector['task'][$name])) {

            return false;

        }

 

        list(, $coroutine) = $collector['task'][$name];

        $task = App::getBean($name);

        if ($coroutine) {

            $result = $this->runCoTask($task, $method, $params, $logid, $spanid, $name, $type);

        } else {

            $result = $this->runSyncTask($task, $method, $params, $logid, $spanid, $name, $type);

        }

 

        return $result;

    }

}

任務執行思路很簡單,將Worker進程發過來的數據解包還原成原來的調用參數,根據$name參數找到對應的TaskBean並調用其對應的task()方法。其中TaskBean使用類級別註解@Task(name="TaskName")或者@Task("TaskName")聲明。

值得一提的一點是,@Task註解除了name屬性,還有一個coroutine屬性,上述代碼會根據該參數選擇使用協程的runCoTask()或者同步的runSyncTask()執行Task。但是由於而且由於SwooleTask進程的執行是完全同步的,不支持協程,所以目前版本請該參數不要配置為true。同樣的在TaskBean中編寫的任務代碼必須的同步阻塞的或者是要能根據環境自動將非同步非阻塞和協程降級為同步阻塞的

從Process中投遞任務

前面我們提到:

SwooleTask機制的本質是Worker進程將耗時任務投遞給同步的Task進程(又名 TaskWorker)處理。

換句話說,Swoole$server->taskCo()$server->task()都只能在Worker進程中使用。
這個限制大大的限制了使用場景。 如何能夠為了能夠在Process中投遞任務呢?Swoft為了繞過這個限制提供了Task::deliverByProcess()方法。其實現原理也很簡單,通過Swoole$server->sendMessage()方法將調用信息從Process中投遞到Worker進程中,然後由Worker進程替其投遞到Task進程當中,相關代碼如下:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

//Swoft\Task\Task.php

/**

 * Deliver task by process

 *

 * @param string $taskName

 * @param string $methodName

 * @param array  $params

 * @param string $type

 * @param int    $timeout

 * @param int    $workId

 *

 * @return bool

 */

public static function deliverByProcess(string $taskName, string $methodName, array $params = [], int $timeout = 3, int $workId = 0, string $type = self::TYPE_ASYNC): bool

{

    /* @var PipeMessageInterface $pipeMessage */

    $server      = App::$server->getServer();

    $pipeMessage = App::getBean(PipeMessage::class);

    $data = [

        'name'    => $taskName,

        'method'  => $methodName,

        'params'  => $params,

        'timeout' => $timeout,

        'type'    => $type,

    ];

 

    $message = $pipeMessage->pack(PipeMessage::MESSAGE_TYPE_TASK, $data);

    return $server->sendMessage($message, $workId);

}

數據打包後使用$server->sendMessage()投遞給Worker:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

//Swoft\Bootstrap\Server\ServerTrait.php

/**

 * onPipeMessage event callback

 *

 * @param \Swoole\Server $server

 * @param int            $srcWorkerId

 * @param string         $message

 * @return void

 * @throws \InvalidArgumentException

 */

public function onPipeMessage(Server $server, int $srcWorkerId, string $message)

{

    /* @var PipeMessageInterface $pipeMessage */

    $pipeMessage = App::getBean(PipeMessage::class);

    list($type, $data) = $pipeMessage->unpack($message);

 

    App::trigger(AppEvent::PIPE_MESSAGE, null, $type, $data, $srcWorkerId);

}

$server->sendMessage後,Worker進程收到數據時會觸發一個swoole.pipeMessage事件的回調,Swoft會將其轉換成自己的swoft.pipeMessage事件並觸發.

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

//Swoft\Task\Event\Listeners\PipeMessageListener.php

/**

 * The pipe message listener

 *

 * @Listener(event=AppEvent::PIPE_MESSAGE)

 */

class PipeMessageListener implements EventHandlerInterface

{

    /**

     * @param \Swoft\Event\EventInterface $event

     */

    public function handle(EventInterface $event)

    {

        $params = $event->getParams();

        if (count($params) < 3) {

            return;

        }

 

        list($type, $data, $srcWorkerId) = $params;

 

        if ($type != PipeMessage::MESSAGE_TYPE_TASK) {

            return;

        }

 

        $type       = $data['type'];

        $taskName   = $data['name'];

        $params     = $data['params'];

        $timeout    = $data['timeout'];

        $methodName = $data['method'];

 

        // delever task

        Task::deliver($taskName, $methodName, $params, $type, $timeout);

    }

}

swoft.pipeMessage事件最終由PipeMessageListener處理。在相關的監聽其中,如果發現swoft.pipeMessage事件由Task::deliverByProcess()產生的,Worker進程會替其執行一次Task::deliver(),最終將任務數據投遞到TaskWorker進程中。

一道簡單的回顧練習:從Task::deliverByProcess()到某TaskBean 最終執行任務,經歷了哪些進程,而調用鏈的哪些部分又分別是在哪些進程中執行?

從Command進程或其子進程中投遞任務

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

//Swoft\Task\QueueTask.php

/**

 * @param string $data

 * @param int    $taskWorkerId

 * @param int    $srcWorkerId

 *

 * @return bool

 */

public function deliver(string $data, int $taskWorkerId = null, $srcWorkerId = null)

{

    if ($taskWorkerId === null) {

        $taskWorkerId = mt_rand($this->workerNum + 1, $this->workerNum + $this->taskNum);

    }

 

    if ($srcWorkerId === null) {

        $srcWorkerId = mt_rand(0, $this->workerNum - 1);

    }

 

    $this->check();

    $data   = $this->pack($data, $srcWorkerId);

    $result = \msg_send($this->queueId, $taskWorkerId, $data, false);

    if (!$result) {

        return false;

    }

 

    return true;

}

對於Command進程的任務投遞,情況會更複雜一點。
上文提到的Process,其往往衍生於Http/Rpc服務,作為同一個Manager的子孫進程,他們能夠拿到Swoole\Server的句柄變數,從而通過$server->sendMessage(),$server->task()等方法進行任務投遞。

但在Swoft的體系中,還有一個十分路人的角色: Command
Command的進程從shellcronb獨立啟動,和Http/Rpc服務相關的進程沒有親緣關係。因此Command進程以及從Command中啟動的Process進程是沒有辦法拿到Swoole\Server的調用句柄直接通過UnixSocket進行任務投遞的。
為了為這種進程提供任務投遞支持,Swoft利用了SwooleTask進程的一個特殊功能----消息隊列。

使用消息隊列的Task進程.png

同一個項目中CommandHttp\RpcServer 通過約定一個message_queue_key獲取到系統內核中的同一條消息隊列,然後Comand進程就可以通過該消息隊列向Task進程投遞任務了。
該機制沒有提供對外的公開方法,僅僅被包含在Task::deliver()方法中,Swoft會根據當前環境隱式切換投遞方式。但該消息隊列的實現依賴Semaphore拓展,如果你想使用,需要在編譯PHP時加上--enable-sysvmsg參數。

定時任務

除了手動執行的普通任務,Swoft還提供了精度為秒的定時任務功能用來在項目中替代Linux的Crontab功能.

Swoft用兩個前置Process---任務計划進程:CronTimerProcess和任務執行進程CronExecProcess
,和兩張記憶體數據表-----RunTimeTable(任務(配置)表)OriginTable((任務)執行表)用於定時任務的管理調度。
兩張表的每行記錄的結構如下:

1

2

3

4

您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 在MySLQ中 UPDATA 和 INSERT 數據的時候,如果數據上面帶有emoji圖標,例如:?、?、? 很容易更新或者插入不成功,導致報錯。 1 2 Error: ER_TRUNCATED_WRONG_VALUE_FOR_FIELD: Incorrect string value: '\xF0 ...
  • 1. 初識文件操作 使⽤python來讀寫⽂件是非常簡單的操作。我們使⽤open()函數來打開⼀個⽂件,獲取到⽂ 件句柄,然後通過⽂件句柄就可以進⾏各種各樣的操作了,根據打開⽅式的不同能夠執⾏的操 作也會有相應的差異。 打開⽂件的⽅式: r, w, a, r+, w+, a+, rb, wb, ab ...
  • python學習之路-01 x = 1+2+3+4 print(x) print(x*5) y = x*5 print(y+100-45+2) print('泰哥泰哥,我是小弟') print('泰哥泰哥,我是三弟小妹') t-t = 2 3t_t = 23 *r = 4 _ = 'fdsa' __ ...
  • 這篇文章主要介紹了關於如何使用swoole加速laravel,有著一定的參考價值,現在分享給大家,有需要的朋友可以參考一下 再來複習一下吧,導致 php 慢的各種因素中解析性語言的特性可以說是罪魁禍首,再加上,每次請求完都釋放請求時所載入的全部文件,因此也就顯得更慢了。 後來我們有了 opcache ...
  • 本篇文章給大家帶來的內容是關於php為什麼需要非同步編程?php非同步編程的詳解(附示例),有一定的參考價值,有需要的朋友可以參考一下,希望對你有所幫助。 我對 php 非同步的知識還比較混亂,寫這篇是為了整理,可能有錯。 傳統的 php-fpm 一個進程執行一個請求,要達到多少併發,就要生成多少個進程。 ...
  • 最近兩個月一直在研究 Swoole,那麼藉助這篇文章,我希望能夠把 Swoole 安利給更多人。雖然 Swoole 可能目前定位是一些高級 phper 的玩具,讓中低級望而生畏,可能對一些應用場景也一臉懵逼,但其實沒這麼難的。 在 Swoole 官網的自我介紹是“面向生產環境的 PHP 非同步網路通信 ...
  • 這篇文章僅僅只實現一個 Redis 連接池,篇幅就太少了,順便將前幾篇整合一下。Demo 中大概包含這些點: 實現 MySQL 連接池 實現 MySQL CURD 方法的定義 實現 Redis 連接池 實現 Redis 方法的定義 滿足 HTTP、TCP、WebSocket 調用 提供 Demo 供 ...
  • 下載ProcessExplorer ProcessExplorer下載地址:ProcessExplorer 下載運行代碼 打開ProcessExplorer 查看javaw.exe的pid jstack pid號 > G:\1.log(保存路徑) 查看tid javaw.exe右鍵屬性-》點Thre ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...