swoole中使用task進程非同步的處理耗時任務

来源:https://www.cnblogs.com/wadhf/archive/2019/11/08/11823293.html

我們知道,swoole中有兩大進程,分別是 master 主進程和 manager 管理進程。 其中 master 主進程中會有一個主 reactor 線程和多個 reactor 線程,主要的作用就是用來維護TCP連接,處理網路IO,收發數據。 而 manager 管理進程,作用則是 fork 和管 ...


我們知道,swoole中有兩大進程,分別是 master 主進程和 manager 管理進程。

其中 master 主進程中會有一個主 reactor 線程和多個 reactor 線程,主要的作用就是用來維護TCP連接,處理網路IO,收發數據。

而 manager 管理進程,作用則是 fork 和管理 worker 和 task 進程。

worker 進程的作用是接收 reactor 線程傳遞的數據,並處理數據,返回處理結果給 reactor 線程。

task 進程的作用是處理一些相對耗時的任務,task 與 worker 進程是獨立的,不會影響 worker 進程處理客戶端的請求。

 

一、task 進程的應用場景:

1、相對耗時的郵件群發,比如某某活動,需要給100W用戶發送活動郵件。

2、推送某些大V的動態,比如某大V發了條新消息,粉絲需要及時獲取到該動態。

 

二、worker 與 task 的相互關係:

1、worker 進程中能過調用 task() 來投遞任務,task 進程中 通過 onTask 事件來響應投遞來的任務。

2、task 進程中 通過 直接返回 或 調用 finish() 來告訴 worker 進程任務處理完畢,worker 進程中 通過 onFinish 事件響應任務完成。

 

三、使用 task 的前題:

1、在 Server 中 配置 task_worker_num 數量。

2、設置 Server 的 onTask 和 onFinish 事件回調函數。

 

四、簡單的使用task進行累加和的計算例子

<?php
 
$server = new swoole_server('0.0.0.0', 6666);
 
$server->set([
    'worker_num' => 2,
    'task_worker_num' => 16,
]);
 
$server->on('WorkerStart', function ($server, $worker_id) {
    //註意這裡,我們通過taskworker來判斷是task進程還是worker進程
    //需要在worker進程中調用task(),不然會報出警告
    //這裡會執行兩遍,因為我們設置了worker_num數為2
    if (!$server->taskworker) {
        echo '投遞任務開始...', PHP_EOL;
        //投遞32個累加計算任務給16個task進程
        for ($ix = 0; $ix < 32; $ix++) {
            //註意這裡的投遞是非同步的
            $server->task([mt_rand(1, 100), mt_rand(1000, 9999)]);
        }
        echo '投遞任務結束...', PHP_EOL;
    }
});
 
//server服務必須要有onReceive回調
$server->on('Receive', function ($server, $fd, $reactor_id, $data) {
 
});
 
//註意,task進程完全是同步阻塞模式的
$server->on('Task', function ($server, $task_id, $src_worker_id, $data) {
    echo "task {$task_id} 進程正在工作...", PHP_EOL;
    $start = $data[0];
    $end = $data[1];
    $total = 0;
    for (; $start <= $end; $start++) {
        $total += $start;
    }
    echo "task {$task_id} 進程完成工作...", PHP_EOL;
    return $total;
});
 
$server->on('Finish', function ($server, $task_id, $data) {
    echo "task {$task_id} 進程處理完成, 結果為 {$data}", PHP_EOL;
});
 
$server->start();

註意,我們通過調用 task() 往任務池中投遞任務,swoole 底層會輪詢的投遞任務到各個 task 進程。

當你投遞任務的數量超過 onTask 的處理速度,這會導致任務池被塞滿,進而導致 worker 進程發生阻塞,所以需合理設置 task_worker_num 數量和處理速度之間的關係。

當然,我們也可以人為的把任務投遞到指定的 task 進程。task() 函數的第二個參數可以指定要投遞的 task 進程ID,ID範圍為 0 到 (task_worker_num - 1)。

 

五、對任務進行切分,人為控制投遞到 task 進程

<?php
 
$server = new swoole_server('0.0.0.0', 6666);
 
$server->set([
    'worker_num' => 1,
    'task_worker_num' => 10,
]);
 
$server->on('WorkerStart', function ($server, $worker_id) {
    //為了方便演示,把worker_num設置為1,這裡只會執行一次
    if (!$server->taskworker) {
        //通過swoole_table共用記憶體,在不同進程中共用數據
        $server->result = new swoole_table(10240);
        //用於保存task進程完成數量
        $server->result->column('finish_nums', swoole_table::TYPE_INT);
        //用於保存最終計算結果
        $server->result->column('result', swoole_table::TYPE_INT);
        $server->result->create();
        //計算1000的累加和,並把計算任務分配到10個task進程上
        $num = 1000;
        $step = $num / $server->setting['task_worker_num'];
        for ($ix = 0; $ix < $server->setting['task_worker_num']; $ix++) {
            $start = $ix * $step;
            $server->task([$start, $start + $step], $ix);
        }
    }
});
 
$server->on('Receive', function ($server, $fd, $reactor_id, $data) {
 
});
 
//註意,task進程完全是同步阻塞模式的
$server->on('Task', function ($server, $task_id, $src_worker_id, $data) {
    echo "task {$task_id} 進程正在工作... 計算 {$data[0]} - {$data[1]} ", PHP_EOL;
    $start = ++$data[0];
    $end = $data[1];
    $total = 0;
    for (; $start <= $end; $start++) {
        $total += $start;
    }
    echo "task {$task_id} 進程完成工作...", PHP_EOL;
    return $total;
});
 
$server->on('Finish', function ($server, $task_id, $data) {
    echo "task {$task_id} 進程處理完成, 結果為 {$data}", PHP_EOL;
    $server->result->incr('finish_nums', 'finish_nums');
    $server->result->set('result', ['result' => $data + $server->result->get('result', 'result')]);
 
    if ($server->result->get('finish_nums', 'finish_nums') == $server->setting['task_worker_num']) {
        echo "最終計算結果:{$server->result->get('result', 'result')}", PHP_EOL;
    }
});
 
$server->s
tart();

 


您的分享是我們最大的動力!

更多相關文章
  • 題目: 鏈接:https://www.nowcoder.com/questionTerminal/6736cc3ffd1444a4a0057dee89be789b?orderByHotValue來源:牛客網牛牛舉辦了一次編程比賽,參加比賽的有3*n個選手,每個選手都有一個水平值a_i.現在要將這些選 ...
  • 1 先談Finalize() finalize()能做的所有工作,使用try-finally或者其他方式都可以做得更好、更及時,所以筆者建議大家完全可以忘掉Java語言中有這個方法的存在。 ——《深入理解JVM》 finalize()方法確實可以實現一次對象的自救,但是其不確定性和昂貴的運行代價都表 ...
  • 通過前面2篇文章我們搭建了SW的基礎環境,監控了微服務,能瞭解所有服務的運行情況。但是當出現服務響應慢,介面耗時嚴重時我們需要立即定位到問題,這就需要我們今天的主角 監控告警,同時此篇也是SW系列的最後一篇。 UI參數 首先我們認識一下SW DashBoard上的幾個關鍵參數,如下圖所示 告警配置 ...
  • 一.docker簡介 1、docker定義:docker是一個用來裝應用的容器,就像杯子可以裝水,筆筒可以裝筆,書包可以放書一樣。你可以把“Hello World!”放到docker中,也可以把網站放到docker中,你可以把任何你想到的程式放到docker中。 2、docker思想: (1)集裝箱 ...
  • 智力題目有三個容積分別為3升、5升、8升的水桶,其中容積為8升的水桶中裝滿了水,容積為3升和容積為5升的水桶都是空的。三個水桶都沒有刻度,現在需要將大水桶中的8升水等分成兩份,每份都是4升水,附加條件是只能這三個水桶,不能藉助其他輔助容器。“恩,是的,這是一個很經典的問題。”“然而,我們並不能想全, ...
  • 今天,在Anaconda prompt啟動python遇到瞭如下錯誤: UnicodeDecodeError: ‘gbk’ codec can’t decode byte 0xaf in position 553: illegal multibyte sequence 看了看出錯跟蹤,查看瞭如下位置 ...
  • 許多小伙伴對於java中的三種初始化塊的執行順序一直感到頭疼,接下來我們就來分析一下這三種初始化塊到底是怎麼運行的。有些公司也會將這個問題作為筆試題目。 下麵通過一段代碼來看看創建對象時這麼初始化塊是如何運行的 package com.hxy; public class CodeBlock{ pub ...
  • 字元串或串(String)是由數字、字母、下劃線組成的一串字元。一般記為 s=“a1a2···an”(n>=0)。它是編程語言中表示文本的數據類型。在程式設計中,字元串(string)為符號或數值的一個連續序列,如符號串(一串字元)或二進位數字串(一串二進位數字)。 String類型你一定不陌生,畢 ...
一周排行
  • C#中的DefaultView方法 簡介: 首先可建立一個表,對錶進行填充若幹條數據,代碼如下: //創建Table1 DataTable dt = new DataTable(); //對Table1添加列名,並設置列值類型 DataTable dt1 = new DataTable();//創建 ...
  • 1、運行程式報錯: FailFast: Couldn't find a valid ICU package installed on the system. 解決方法: yum install icu -y 2、程式運行後,本地可以訪問,但其他機器無法訪問,需要開放埠 firewall-cmd - ...
  • 只是一個Demo,所用有很多功能也沒有添加進去如分頁,輸入驗證,頁面也沒有進行精心佈局。 整體先來幾張圖解 ...
  • Core提供二種開發模式:Core Pages和Core MVC,今天介紹的是Core MVC。 1、創建web MVC項目 新建service/h_r.baseservice類庫文件、data/h_r.efdata類庫文件、common/h_r.common類庫文件。 引入需要的CSS文件和JS文 ...
  • 學習網址:https://docs.microsoft.com/zh-cn/visualstudio/get-started/visual-studio-ide?view=vs-2019 示範 vs2019: 變數的重命名的重構,更改該變數命名的同時,引用該變數的地方也會更改,如果該變數有被反射用到 ...
  • 1、在data裡面新建個Entity文件用於存放表映射,設計資料庫,執行如下語句 Scaffold-DbContext -Force "server=.;user=sunyong;password=1qaz!QAZ;database=hr;" Microsoft.EntityFrameworkCor ...
  • 1、發送郵件類,百度一大堆,這裡用的也是直接百度拿過來的 public static bool get_send_email(email email, string Title, string Body) { MailMessage mailMsg = new MailMessage(); mail ...
  • 1、添加用戶列表控制器,用於用戶列表顯示,登錄,增刪改查,郵件發送,下載 public userlistController(MainDbContext _db, ILogger<operatorlog> _logger, IOptions<email> sendMail) { db = _db; ...
  • 1、用戶列表頁面 @{ Layout = Layout = null;}<table id="datalistuser" class="easyui-datagrid" url="/userlist/getuserlist" toolbar="#toolbaruser" rownumbers="tr ...
  • 1、引用包 Microsoft.EntityFrameworkCore.Tools Microsoft.EntityFrameworkCore.SqlServer Microsoft.AspNetCore.Mvc.Razor.RuntimeCompilation Microsoft.AspNetCo ...