swoole中使用task進程非同步的處理耗時任務

来源:https://www.cnblogs.com/wadhf/archive/2019/11/08/11823293.html
-Advertisement-
Play Games

我們知道,swoole中有兩大進程,分別是 master 主進程和 manager 管理進程。 其中 master 主進程中會有一個主 reactor 線程和多個 reactor 線程,主要的作用就是用來維護TCP連接,處理網路IO,收發數據。 而 manager 管理進程,作用則是 fork 和管 ...


我們知道,swoole中有兩大進程,分別是 master 主進程和 manager 管理進程。

其中 master 主進程中會有一個主 reactor 線程和多個 reactor 線程,主要的作用就是用來維護TCP連接,處理網路IO,收發數據。

而 manager 管理進程,作用則是 fork 和管理 worker 和 task 進程。

worker 進程的作用是接收 reactor 線程傳遞的數據,並處理數據,返回處理結果給 reactor 線程。

task 進程的作用是處理一些相對耗時的任務,task 與 worker 進程是獨立的,不會影響 worker 進程處理客戶端的請求。

 

一、task 進程的應用場景:

1、相對耗時的郵件群發,比如某某活動,需要給100W用戶發送活動郵件。

2、推送某些大V的動態,比如某大V發了條新消息,粉絲需要及時獲取到該動態。

 

二、worker 與 task 的相互關係:

1、worker 進程中能過調用 task() 來投遞任務,task 進程中 通過 onTask 事件來響應投遞來的任務。

2、task 進程中 通過 直接返回 或 調用 finish() 來告訴 worker 進程任務處理完畢,worker 進程中 通過 onFinish 事件響應任務完成。

 

三、使用 task 的前題:

1、在 Server 中 配置 task_worker_num 數量。

2、設置 Server 的 onTask 和 onFinish 事件回調函數。

 

四、簡單的使用task進行累加和的計算例子

<?php
 
$server = new swoole_server('0.0.0.0', 6666);
 
$server->set([
    'worker_num' => 2,
    'task_worker_num' => 16,
]);
 
$server->on('WorkerStart', function ($server, $worker_id) {
    //註意這裡,我們通過taskworker來判斷是task進程還是worker進程
    //需要在worker進程中調用task(),不然會報出警告
    //這裡會執行兩遍,因為我們設置了worker_num數為2
    if (!$server->taskworker) {
        echo '投遞任務開始...', PHP_EOL;
        //投遞32個累加計算任務給16個task進程
        for ($ix = 0; $ix < 32; $ix++) {
            //註意這裡的投遞是非同步的
            $server->task([mt_rand(1, 100), mt_rand(1000, 9999)]);
        }
        echo '投遞任務結束...', PHP_EOL;
    }
});
 
//server服務必須要有onReceive回調
$server->on('Receive', function ($server, $fd, $reactor_id, $data) {
 
});
 
//註意,task進程完全是同步阻塞模式的
$server->on('Task', function ($server, $task_id, $src_worker_id, $data) {
    echo "task {$task_id} 進程正在工作...", PHP_EOL;
    $start = $data[0];
    $end = $data[1];
    $total = 0;
    for (; $start <= $end; $start++) {
        $total += $start;
    }
    echo "task {$task_id} 進程完成工作...", PHP_EOL;
    return $total;
});
 
$server->on('Finish', function ($server, $task_id, $data) {
    echo "task {$task_id} 進程處理完成, 結果為 {$data}", PHP_EOL;
});
 
$server->start();

註意,我們通過調用 task() 往任務池中投遞任務,swoole 底層會輪詢的投遞任務到各個 task 進程。

當你投遞任務的數量超過 onTask 的處理速度,這會導致任務池被塞滿,進而導致 worker 進程發生阻塞,所以需合理設置 task_worker_num 數量和處理速度之間的關係。

當然,我們也可以人為的把任務投遞到指定的 task 進程。task() 函數的第二個參數可以指定要投遞的 task 進程ID,ID範圍為 0 到 (task_worker_num - 1)。

 

五、對任務進行切分,人為控制投遞到 task 進程

<?php
 
$server = new swoole_server('0.0.0.0', 6666);
 
$server->set([
    'worker_num' => 1,
    'task_worker_num' => 10,
]);
 
$server->on('WorkerStart', function ($server, $worker_id) {
    //為了方便演示,把worker_num設置為1,這裡只會執行一次
    if (!$server->taskworker) {
        //通過swoole_table共用記憶體,在不同進程中共用數據
        $server->result = new swoole_table(10240);
        //用於保存task進程完成數量
        $server->result->column('finish_nums', swoole_table::TYPE_INT);
        //用於保存最終計算結果
        $server->result->column('result', swoole_table::TYPE_INT);
        $server->result->create();
        //計算1000的累加和,並把計算任務分配到10個task進程上
        $num = 1000;
        $step = $num / $server->setting['task_worker_num'];
        for ($ix = 0; $ix < $server->setting['task_worker_num']; $ix++) {
            $start = $ix * $step;
            $server->task([$start, $start + $step], $ix);
        }
    }
});
 
$server->on('Receive', function ($server, $fd, $reactor_id, $data) {
 
});
 
//註意,task進程完全是同步阻塞模式的
$server->on('Task', function ($server, $task_id, $src_worker_id, $data) {
    echo "task {$task_id} 進程正在工作... 計算 {$data[0]} - {$data[1]} ", PHP_EOL;
    $start = ++$data[0];
    $end = $data[1];
    $total = 0;
    for (; $start <= $end; $start++) {
        $total += $start;
    }
    echo "task {$task_id} 進程完成工作...", PHP_EOL;
    return $total;
});
 
$server->on('Finish', function ($server, $task_id, $data) {
    echo "task {$task_id} 進程處理完成, 結果為 {$data}", PHP_EOL;
    $server->result->incr('finish_nums', 'finish_nums');
    $server->result->set('result', ['result' => $data + $server->result->get('result', 'result')]);
 
    if ($server->result->get('finish_nums', 'finish_nums') == $server->setting['task_worker_num']) {
        echo "最終計算結果:{$server->result->get('result', 'result')}", PHP_EOL;
    }
});
 
$server->s
tart();

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 題目: 鏈接:https://www.nowcoder.com/questionTerminal/6736cc3ffd1444a4a0057dee89be789b?orderByHotValue來源:牛客網牛牛舉辦了一次編程比賽,參加比賽的有3*n個選手,每個選手都有一個水平值a_i.現在要將這些選 ...
  • 1 先談Finalize() finalize()能做的所有工作,使用try-finally或者其他方式都可以做得更好、更及時,所以筆者建議大家完全可以忘掉Java語言中有這個方法的存在。 ——《深入理解JVM》 finalize()方法確實可以實現一次對象的自救,但是其不確定性和昂貴的運行代價都表 ...
  • 通過前面2篇文章我們搭建了SW的基礎環境,監控了微服務,能瞭解所有服務的運行情況。但是當出現服務響應慢,介面耗時嚴重時我們需要立即定位到問題,這就需要我們今天的主角 監控告警,同時此篇也是SW系列的最後一篇。 UI參數 首先我們認識一下SW DashBoard上的幾個關鍵參數,如下圖所示 告警配置 ...
  • 一.docker簡介 1、docker定義:docker是一個用來裝應用的容器,就像杯子可以裝水,筆筒可以裝筆,書包可以放書一樣。你可以把“Hello World!”放到docker中,也可以把網站放到docker中,你可以把任何你想到的程式放到docker中。 2、docker思想: (1)集裝箱 ...
  • 智力題目有三個容積分別為3升、5升、8升的水桶,其中容積為8升的水桶中裝滿了水,容積為3升和容積為5升的水桶都是空的。三個水桶都沒有刻度,現在需要將大水桶中的8升水等分成兩份,每份都是4升水,附加條件是只能這三個水桶,不能藉助其他輔助容器。“恩,是的,這是一個很經典的問題。”“然而,我們並不能想全, ...
  • 今天,在Anaconda prompt啟動python遇到瞭如下錯誤: UnicodeDecodeError: ‘gbk’ codec can’t decode byte 0xaf in position 553: illegal multibyte sequence 看了看出錯跟蹤,查看瞭如下位置 ...
  • 許多小伙伴對於java中的三種初始化塊的執行順序一直感到頭疼,接下來我們就來分析一下這三種初始化塊到底是怎麼運行的。有些公司也會將這個問題作為筆試題目。 下麵通過一段代碼來看看創建對象時這麼初始化塊是如何運行的 package com.hxy; public class CodeBlock{ pub ...
  • 字元串或串(String)是由數字、字母、下劃線組成的一串字元。一般記為 s=“a1a2···an”(n>=0)。它是編程語言中表示文本的數據類型。在程式設計中,字元串(string)為符號或數值的一個連續序列,如符號串(一串字元)或二進位數字串(一串二進位數字)。 String類型你一定不陌生,畢 ...
一周排行
    -Advertisement-
    Play Games
  • Timer是什麼 Timer 是一種用於創建定期粒度行為的機制。 與標準的 .NET System.Threading.Timer 類相似,Orleans 的 Timer 允許在一段時間後執行特定的操作,或者在特定的時間間隔內重覆執行操作。 它在分散式系統中具有重要作用,特別是在處理需要周期性執行的 ...
  • 前言 相信很多做WPF開發的小伙伴都遇到過表格類的需求,雖然現有的Grid控制項也能實現,但是使用起來的體驗感並不好,比如要實現一個Excel中的表格效果,估計你能想到的第一個方法就是套Border控制項,用這種方法你需要控制每個Border的邊框,並且在一堆Bordr中找到Grid.Row,Grid. ...
  • .NET C#程式啟動閃退,目錄導致的問題 這是第2次踩這個坑了,很小的編程細節,容易忽略,所以寫個博客,分享給大家。 1.第一次坑:是windows 系統把程式運行成服務,找不到配置文件,原因是以服務運行它的工作目錄是在C:\Windows\System32 2.本次坑:WPF桌面程式通過註冊表設 ...
  • 在分散式系統中,數據的持久化是至關重要的一環。 Orleans 7 引入了強大的持久化功能,使得在分散式環境下管理數據變得更加輕鬆和可靠。 本文將介紹什麼是 Orleans 7 的持久化,如何設置它以及相應的代碼示例。 什麼是 Orleans 7 的持久化? Orleans 7 的持久化是指將 Or ...
  • 前言 .NET Feature Management 是一個用於管理應用程式功能的庫,它可以幫助開發人員在應用程式中輕鬆地添加、移除和管理功能。使用 Feature Management,開發人員可以根據不同用戶、環境或其他條件來動態地控制應用程式中的功能。這使得開發人員可以更靈活地管理應用程式的功 ...
  • 在 WPF 應用程式中,拖放操作是實現用戶交互的重要組成部分。通過拖放操作,用戶可以輕鬆地將數據從一個位置移動到另一個位置,或者將控制項從一個容器移動到另一個容器。然而,WPF 中預設的拖放操作可能並不是那麼好用。為瞭解決這個問題,我們可以自定義一個 Panel 來實現更簡單的拖拽操作。 自定義 Pa ...
  • 在實際使用中,由於涉及到不同編程語言之間互相調用,導致C++ 中的OpenCV與C#中的OpenCvSharp 圖像數據在不同編程語言之間難以有效傳遞。在本文中我們將結合OpenCvSharp源碼實現原理,探究兩種數據之間的通信方式。 ...
  • 一、前言 這是一篇搭建許可權管理系統的系列文章。 隨著網路的發展,信息安全對應任何企業來說都越發的重要,而本系列文章將和大家一起一步一步搭建一個全新的許可權管理系統。 說明:由於搭建一個全新的項目過於繁瑣,所有作者將挑選核心代碼和核心思路進行分享。 二、技術選擇 三、開始設計 1、自主搭建vue前端和. ...
  • Csharper中的表達式樹 這節課來瞭解一下表示式樹是什麼? 在C#中,表達式樹是一種數據結構,它可以表示一些代碼塊,如Lambda表達式或查詢表達式。表達式樹使你能夠查看和操作數據,就像你可以查看和操作代碼一樣。它們通常用於創建動態查詢和解析表達式。 一、認識表達式樹 為什麼要這樣說?它和委托有 ...
  • 在使用Django等框架來操作MySQL時,實際上底層還是通過Python來操作的,首先需要安裝一個驅動程式,在Python3中,驅動程式有多種選擇,比如有pymysql以及mysqlclient等。使用pip命令安裝mysqlclient失敗應如何解決? 安裝的python版本說明 機器同時安裝了 ...