kafka 由於它自身的高性能發送與消費能力,而受到廣大企業的喜歡,所以我們就先看看kafka 一些源碼實現如下: 這段代碼摘抄的是,kafka源碼 生產者發送消息demo(kafka.examples.Producer) 裡面的一個片段,主要是涉及到兩個知識點,一個是非同步發送消息, 回調函數的實現 ...
kafka 由於它自身的高性能發送與消費能力,而受到廣大企業的喜歡,所以我們就先看看kafka 一些源碼實現如下:
1 public void run() { 2 int messageNo = 1; 3 while (true) { 4 String messageStr = "Message_" + messageNo; 5 long startTime = System.currentTimeMillis(); 6 if (isAsync) { 7 producer.send(new ProducerRecord<>(topic, 8 messageNo, 9 messageStr), new DemoCallBack(startTime, messageNo, messageStr));// 非同步發送 10 } else { 11 try { 12 producer.send(new ProducerRecord<>(topic, 13 messageNo, 14 messageStr)).get();// 同步發送 15 System.out.println("Sent message: (" + messageNo + ", " + messageStr + ")"); 16 } catch (InterruptedException | ExecutionException e) { 17 e.printStackTrace(); 18 } 19 } 20 ++messageNo; 21 } 22 }
這段代碼摘抄的是,kafka源碼 生產者發送消息demo(kafka.examples.Producer) 裡面的一個片段,主要是涉及到兩個知識點,一個是非同步發送消息,
回調函數的實現,另一個就是同步發送,多線程Future.get 模式的實現。現在分別闡述這兩種實現方式。
非同步回調方式
其實這種方式主要應用在調用多線程執行某個任務時,不用傻傻等到該線程完成後得到相應的反饋信息。舉個例子Client端需要調用Server端來執行某個任務,並且希望Server端執行完成後
主動將相應的結果告訴Client端。這個過程就叫做回調了。如下代碼:
1 public class Client implements CSCallBack { 2 3 private volatile boolean stopThread = false; 4 private Server server; 5 6 public Client(Server server) { 7 this.server = server; 8 } 9 10 public void sendMsg(final String msg){ 11 System.out.println("ThreadName="+Thread.currentThread().getName()+" 客戶端:發送的消息為:" + msg); 12 new Thread(new Runnable() { 13 @Override 14 public void run() { 15 server.getClientMsg(Client.this,msg);// 核心代碼1:將被調用方自己當作參數(client)傳遞到調用方(Server) 16 17 while(!stopThread) {// 模擬等待另伺服器端代碼完成 18 System.out.println("ThreadName="+Thread.currentThread().getName()+"客戶端:模擬等待回調完成"); 19 20 try { 21 Thread.sleep(50); 22 } catch (InterruptedException e) { 23 e.printStackTrace(); 24 } 25 } 26 } 27 }).start(); 28 System.out.println("ThreadName="+Thread.currentThread().getName()+" 客戶端:非同步發送成功"); 29 } 30 31 @Override 32 public void process(String status) { 33 stopThread = true; 34 System.out.println("ThreadName="+Thread.currentThread().getName()+" 客戶端:收到服務端回調狀態為:" + status); 35 } 36 }
1 public class Server { 2 3 public void getClientMsg(CSCallBack csCallBack , String msg) { 4 5 6 // 模擬服務端需要對數據處理 7 try { 8 new Thread(new Runnable() { 9 @Override 10 public void run() { 11 System.out.println("ThreadName="+Thread.currentThread().getName()+" 服務端:服務端接收到客戶端發送的消息為:" + msg); 12 while(true) { 13 int max=10,min=1; 14 int ranNum = (int) (Math.random()*(max-min)+min); 15 16 if(ranNum >6) {// 當隨機數大於5時認為任務完成 17 System.out.println("ThreadName="+Thread.currentThread().getName()+" 服務端:數據處理成功,返回成功狀態 200"); 18 String status = "200"; 19 csCallBack.process(status);// 核心代碼2:調用方(Server)任務處理完成相應的任務後,調用被調用方(Client)的方法告知任務完成 20 break; 21 } 22 23 try { 24 Thread.sleep(80); 25 } catch (InterruptedException e) { 26 e.printStackTrace(); 27 } 28 } 29 } 30 }).start(); 31 32 } catch (Exception e) { 33 e.printStackTrace(); 34 } 35 36 } 37 }
其實核心代碼就兩個:
client端:被調用方自己當作參數(client)傳遞到調用方(Server)。
Server端:調用方(Server)任務處理完成相應的任務後,調用被調用方(Client)的方法告知任務完成。
同步發送多線程 Future.get 模式實現
這種方式方式主要是用來等待某一項任務完成後,接著順序執行某項任務。和上面的例子一樣都是client 端 向server 端請求完成某項任務,並且期望server 端在完成任務後,返回結果
實例代碼如下:
1 public class FutureDemo { 2 3 protected RealData realdata = null; 4 protected boolean isReady = false; 5 public synchronized void requestData(RealData realdata) {// client請求server完成某項任務 6 if (isReady) { 7 return; 8 } 9 this.realdata = realdata; 10 isReady = true; 11 notifyAll();//核心代碼2:當請求的任務處理完成時,喚醒等待中的線程 12 } 13 14 public synchronized String getResult() {// client等待server完成任務後返回,此處就相當於 Future.get 15 while (!isReady) { 16 try { 17 wait();//核心代碼1:發出請求後等待線程被激活 18 } catch (InterruptedException e) { 19 } 20 } 21 return realdata.result; 22 } 23 }
核心實現代碼其實就是多線程裡面的,wait 和 notify 實現方式。非同步回調 和 同步 Future get 模式最大的區別,舉個例子吧,
老婆(client 端)很愛老公,老公(伺服器端)每天完成加班很晚,老婆都會等到老公回家然後給他做夜宵(同步 Future get 模式)
老婆(client 端)很愛老公,老公(伺服器端)每天完成加班很晚,老婆覺得一直等太累了,就先睡覺,等老公回來後通知老婆(回調),然後老婆再給老公做夜宵(非同步回調方式)。
所以大家都期望自己的老婆是, Future get 模式 還是 非同步回調模式?