一、Master-Worker設計模式 Master-Worker模式是常用的並行設計模式。它的核心思想是,系統有兩個進程協議工作:Master進程和Worker進程。Master進程負責接收和分配任務,Worker進程負責處理子任務。當各個Worker進程將子任務處理完後,將結果返回給Master ...
一、Master-Worker設計模式
Master-Worker模式是常用的並行設計模式。它的核心思想是,系統有兩個進程協議工作:Master進程和Worker進程。Master進程負責接收和分配任務,Worker進程負責處理子任務。當各個Worker進程將子任務處理完後,將結果返回給Master進程,由Master進行歸納和彙總,從而得到系統結果。
Master-Worker模式的好處是,它能將大任務分解成若幹個小任務,併發執行,從而提高系統性能。而對於系統請求者Client來說,任務一旦提交,Master進程就會立刻分配任務並立即返回,並不會等系統處理完全部任務再返回,其處理過程是非同步的。
二、Master-Worker設計模式代碼實現
1、創建Task任務對象
1 package com.ietree.basicskill.mutilthread.designpattern.masterworker; 2 3 /** 4 * Created by Root on 5/12/2017. 5 */ 6 public class Task { 7 8 private int id; 9 10 private String name; 11 12 private int price; 13 14 public int getId() { 15 return id; 16 } 17 18 public void setId(int id) { 19 this.id = id; 20 } 21 22 public String getName() { 23 return name; 24 } 25 26 public void setName(String name) { 27 this.name = name; 28 } 29 30 public int getPrice() { 31 return price; 32 } 33 34 public void setPrice(int price) { 35 this.price = price; 36 } 37 }
2、實現Worker對象
1 package com.ietree.basicskill.mutilthread.designpattern.masterworker; 2 3 import java.util.concurrent.ConcurrentHashMap; 4 import java.util.concurrent.ConcurrentLinkedQueue; 5 6 /** 7 * Created by Root on 5/12/2017. 8 */ 9 public class Worker implements Runnable { 10 11 private ConcurrentLinkedQueue<Task> workQueue; 12 private ConcurrentHashMap<String, Object> resultMap; 13 14 public void setWorkerQueue(ConcurrentLinkedQueue<Task> workQueue) { 15 this.workQueue = workQueue; 16 } 17 18 public void setResultMap(ConcurrentHashMap<String, Object> resultMap) { 19 this.resultMap = resultMap; 20 } 21 22 @Override 23 public void run() { 24 while (true) { 25 Task input = this.workQueue.poll(); 26 if (input == null) { 27 break; 28 } 29 // 真正的去做業務處理 30 //Object output = handle(input); 31 // 改造 32 Object output = MyWorker.handle(input); 33 // 返回處理結果集 34 this.resultMap.put(Integer.toString(input.getId()), output); 35 } 36 } 37 38 // private Object handle(Task input) { 39 // Object output = null; 40 // try { 41 // // 表示處理task任務的耗時,可能是數據的加工,也可能是操作資料庫...... 42 // Thread.sleep(500); 43 // output = input.getPrice(); 44 // } catch (InterruptedException e) { 45 // e.printStackTrace(); 46 // } 47 // return output; 48 // } 49 50 // 優化,考慮讓繼承類去自己實現具體的業務處理 51 public static Object handle(Task input) { 52 return null; 53 } 54 55 }
3、為了使程式更靈活,將具體的業務執行邏輯抽離,在具體的Worker對象去實現,如這裡的MyWorker對象
1 package com.ietree.basicskill.mutilthread.designpattern.masterworker; 2 3 /** 4 * Created by Root on 5/13/2017. 5 */ 6 public class MyWorker extends Worker { 7 8 public static Object handle(Task input) { 9 Object output = null; 10 try { 11 // 表示處理task任務的耗時,可能是數據的加工,也可能是操作資料庫...... 12 Thread.sleep(500); 13 output = input.getPrice(); 14 } catch (InterruptedException e) { 15 e.printStackTrace(); 16 } 17 return output; 18 } 19 20 }
4、Master類
1 package com.ietree.basicskill.mutilthread.designpattern.masterworker; 2 3 import java.util.HashMap; 4 import java.util.Map; 5 import java.util.concurrent.ConcurrentHashMap; 6 import java.util.concurrent.ConcurrentLinkedQueue; 7 8 /** 9 * Created by Root on 5/12/2017. 10 */ 11 public class Master { 12 13 // 1、使用一個ConcurrentLinkedQueue集合來裝載所有需要執行的任務 14 private ConcurrentLinkedQueue<Task> workQueue = new ConcurrentLinkedQueue<Task>(); 15 16 // 2、使用HashMap來裝載所有的worker對象 17 private HashMap<String, Thread> workers = new HashMap<String, Thread>(); 18 19 // 3、使用一個容器承裝每一個worker併發執行任務的結果集 20 private ConcurrentHashMap<String, Object> resultMap = new ConcurrentHashMap<String, Object>(); 21 22 // 4、構造方法 23 public Master(Worker worker, int workerCount) { 24 // 每一個worker對象都需要有Master的引用,workQueue用於任務的領取,resultMap用於任務的提交 25 worker.setWorkerQueue(this.workQueue); 26 worker.setResultMap(this.resultMap); 27 28 for (int i = 0; i < workerCount; i++) { 29 workers.put("子節點" + Integer.toString(i), new Thread(worker)); 30 } 31 } 32 33 // 5、提交方法 34 public void submit(Task task) { 35 this.workQueue.add(task); 36 } 37 38 // 6、需要有一個執行的方法(啟動應用程式,讓所有的worker工作) 39 public void execute() { 40 for (Map.Entry<String, Thread> me : workers.entrySet()) { 41 me.getValue().start(); 42 } 43 } 44 45 // 7、判斷線程是否執行完畢 46 public boolean isComplete() { 47 for (Map.Entry<String, Thread> me : workers.entrySet()) { 48 // 判斷所有的線程狀態是否屬於已停止狀態 49 if (me.getValue().getState() != Thread.State.TERMINATED) { 50 return false; 51 } 52 } 53 return true; 54 } 55 56 // 8、返回結果集數據 57 public int getResult() { 58 int ret = 0; 59 for (Map.Entry<String, Object> me : resultMap.entrySet()) { 60 // 彙總邏輯 61 ret += (Integer) me.getValue(); 62 } 63 return ret; 64 } 65 66 }
5、測試,具體調用實現
1 package com.ietree.basicskill.mutilthread.designpattern.masterworker; 2 3 import java.util.Random; 4 5 /** 6 * Created by Root on 5/13/2017. 7 */ 8 public class MasterWorkerTest { 9 10 public static void main(String[] args) { 11 12 // Master master = new Master(new Worker(), 10); 13 // 改造 14 // Master master = new Master(new MyWorker(), 10); 15 // 改造(獲取當前機器可用線程數) 16 System.out.println("我的機器可用Processors數量:" + Runtime.getRuntime().availableProcessors()); 17 Master master = new Master(new MyWorker(), Runtime.getRuntime().availableProcessors()); 18 19 Random r = new Random(); 20 for (int i = 1; i <= 10; i++) { 21 Task t = new Task(); 22 t.setId(i); 23 t.setName("任務" + i); 24 t.setPrice(r.nextInt(1000)); 25 master.submit(t); 26 } 27 master.execute(); 28 29 long start = System.currentTimeMillis(); 30 31 while (true) { 32 if (master.isComplete()) { 33 long end = System.currentTimeMillis() - start; 34 int ret = master.getResult(); 35 System.out.println("最終的結果:" + ret + ",執行耗時:" + end); 36 break; 37 } 38 } 39 } 40 41 }
程式輸出:
我的機器可用Processors數量:20
最終的結果:4473,執行耗時:500
從上面的運行結果來看,程式最終執行時間幾乎就等於一個線程單獨運行的時間,在此註意的是,同時執行的線程數是根據你執行此程式的機器配置決定的。