看源碼學編程系列之kafka(一)

来源:https://www.cnblogs.com/pangjia/archive/2019/11/22/11914456.html
-Advertisement-
Play Games

kafka 由於它自身的高性能發送與消費能力,而受到廣大企業的喜歡,所以我們就先看看kafka 一些源碼實現如下: 這段代碼摘抄的是,kafka源碼 生產者發送消息demo(kafka.examples.Producer) 裡面的一個片段,主要是涉及到兩個知識點,一個是非同步發送消息, 回調函數的實現 ...


  kafka 由於它自身的高性能發送與消費能力,而受到廣大企業的喜歡,所以我們就先看看kafka 一些源碼實現如下:  

 1  public void run() {
 2         int messageNo = 1;
 3         while (true) {
 4             String messageStr = "Message_" + messageNo;
 5             long startTime = System.currentTimeMillis();
 6             if (isAsync) { 
 7                 producer.send(new ProducerRecord<>(topic,
 8                     messageNo,
 9                     messageStr), new DemoCallBack(startTime, messageNo, messageStr));// 非同步發送
10             } else { 
11                 try {
12                     producer.send(new ProducerRecord<>(topic,
13                         messageNo,
14                         messageStr)).get();// 同步發送
15                     System.out.println("Sent message: (" + messageNo + ", " + messageStr + ")");
16                 } catch (InterruptedException | ExecutionException e) {
17                     e.printStackTrace();
18                 }
19             }
20             ++messageNo;
21         }
22     }

  這段代碼摘抄的是,kafka源碼 生產者發送消息demo(kafka.examples.Producer) 裡面的一個片段,主要是涉及到兩個知識點,一個是非同步發送消息,

回調函數的實現,另一個就是同步發送,多線程Future.get 模式的實現。現在分別闡述這兩種實現方式。

  非同步回調方式

  其實這種方式主要應用在調用多線程執行某個任務時,不用傻傻等到該線程完成後得到相應的反饋信息。舉個例子Client端需要調用Server端來執行某個任務,並且希望Server端執行完成後

主動將相應的結果告訴Client端。這個過程就叫做回調了。如下代碼:

  

 1 public class Client implements CSCallBack {
 2     
 3     private volatile boolean stopThread = false;
 4     private Server server;
 5 
 6     public Client(Server server) {
 7         this.server = server;
 8     }
 9 
10     public void sendMsg(final String msg){
11         System.out.println("ThreadName="+Thread.currentThread().getName()+" 客戶端:發送的消息為:" + msg);
12         new Thread(new Runnable() {
13             @Override
14             public void run() {
15                 server.getClientMsg(Client.this,msg);// 核心代碼1:將被調用方自己當作參數(client)傳遞到調用方(Server)
16                 
17                 while(!stopThread) {// 模擬等待另伺服器端代碼完成
18                     System.out.println("ThreadName="+Thread.currentThread().getName()+"客戶端:模擬等待回調完成");
19                     
20                     try {
21                         Thread.sleep(50);
22                     } catch (InterruptedException e) {
23                         e.printStackTrace();
24                     }
25                 }
26             }
27         }).start();
28         System.out.println("ThreadName="+Thread.currentThread().getName()+" 客戶端:非同步發送成功");
29     }
30 
31     @Override
32     public void process(String status) {
33         stopThread = true;
34         System.out.println("ThreadName="+Thread.currentThread().getName()+" 客戶端:收到服務端回調狀態為:" + status);
35     }
36 }

 

 1 public class Server {
 2 
 3     public void getClientMsg(CSCallBack csCallBack , String msg) {
 4         
 5 
 6         // 模擬服務端需要對數據處理
 7         try {
 8              new Thread(new Runnable() {
 9                  @Override
10                  public void run() {
11                      System.out.println("ThreadName="+Thread.currentThread().getName()+" 服務端:服務端接收到客戶端發送的消息為:" + msg);
12                      while(true) {
13                         int max=10,min=1;
14                          int ranNum = (int) (Math.random()*(max-min)+min); 
15                          
16                          if(ranNum >6) {//  當隨機數大於5時認為任務完成
17                               System.out.println("ThreadName="+Thread.currentThread().getName()+" 服務端:數據處理成功,返回成功狀態 200");
18                              String status = "200";
19                              csCallBack.process(status);// 核心代碼2:調用方(Server)任務處理完成相應的任務後,調用被調用方(Client)的方法告知任務完成
20                              break;
21                          }
22                          
23                          try {
24                             Thread.sleep(80);
25                         } catch (InterruptedException e) {
26                             e.printStackTrace();
27                         }
28                      }
29                  }
30              }).start();
31             
32         } catch (Exception e) {
33             e.printStackTrace();
34         }
35        
36     }
37 }

         

其實核心代碼就兩個:

      client端:被調用方自己當作參數(client)傳遞到調用方(Server)。

      Server端:調用方(Server)任務處理完成相應的任務後,調用被調用方(Client)的方法告知任務完成。

   同步發送多線程 Future.get 模式實現 

        這種方式方式主要是用來等待某一項任務完成後,接著順序執行某項任務。和上面的例子一樣都是client 端 向server 端請求完成某項任務,並且期望server 端在完成任務後,返回結果

  實例代碼如下:

  

 1 public class FutureDemo {
 2 
 3     protected RealData realdata = null;
 4     protected boolean isReady = false;
 5     public synchronized void requestData(RealData realdata) {// client請求server完成某項任務
 6         if (isReady) {                        
 7             return;     
 8         }
 9         this.realdata = realdata;
10         isReady = true;
11         notifyAll();//核心代碼2:當請求的任務處理完成時,喚醒等待中的線程
12     }
13     
14     public synchronized String getResult() {// client等待server完成任務後返回,此處就相當於 Future.get 
15         while (!isReady) {
16             try {
17                 wait();//核心代碼1:發出請求後等待線程被激活
18             } catch (InterruptedException e) {
19             }
20         }
21         return realdata.result;
22     }
23 }

  核心實現代碼其實就是多線程裡面的,wait 和 notify 實現方式。非同步回調 和 同步 Future get 模式最大的區別,舉個例子吧,

              老婆(client 端)很愛老公,老公(伺服器端)每天完成加班很晚,老婆都會等到老公回家然後給他做夜宵(同步 Future get 模式)

             老婆(client 端)很愛老公,老公(伺服器端)每天完成加班很晚,老婆覺得一直等太累了,就先睡覺,等老公回來後通知老婆(回調),然後老婆再給老公做夜宵(非同步回調方式)。

 

      所以大家都期望自己的老婆是, Future get 模式 還是 非同步回調模式?

              

            

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 如果不知道 Jimu(積木) 是啥,請移步 ".Net Core 分散式微服務框架介紹 Jimu" 這次升級除了支持 .Net Core 3.0 還新增部分功能,如 REST, 鏈路跟蹤等,以下為詳細; 一、功能列表 | 功能 | 說明 |Jimu 1.0.0 | Jimu 0.6.0| | | | ...
  • 轉自:https://www.cnblogs.com/lidabo/p/9134174.html 此處僅供學習,版權屬原作者; 作為一個圖形圖像方向的研究生,我經常都在和 OpenGL 、OpenCV 等多種 C++ 庫打交道。這些庫遵循著不同的規則和用法;另外,為了讓自己的程式具有更多的交互能力, ...
  • NCNN是騰訊開源的一個為手機端極致優化的高性能神經網路前向計算框架。在AOE開源工程里,我們提供了NCNN組件,下麵我們以SqueezeNet物體識別這個Sample為例,來講一講NCNN組件的設計和用法。 ...
  • 第二章 Spring框架基礎 面向介面編程的設計方法 ​ 在上一章中,我們看到了一個依賴於其他類的POJO類包含了對其依賴項的具體類的引用。例如,FixedDepositController 類包含 對 FixedDepositService 類的引用,FixedDepositService 類包含 ...
  • 1 class filter{ 2 public String name(){ 3 return getClass().getSimpleName(); 4 } 5 public String process(String s){ 6 return s; 7 } 8 class filter1 ex ...
  • 本文分四個步驟來詳講如何用PyInstaller將py文件打包成exe文件 1. PyInstaller 簡介 2. PyInstaller 安裝 3. 將py文件打包成exe文件 4. PyInstaller打包常見問題 ...
  • 基於WEB的網上購物系統主要功能包括:前臺用戶登錄退出、註冊、線上購物、修改個人信息、後臺商品管理等等。本系統結構如下:(1)商品瀏覽模塊: 實現瀏覽最新商品 實現按商品名稱瀏覽商品 實現根據商品分類瀏覽商品(2)購物車: 登錄後可以將商品加入購物車,或從購物車移除商品(3)登錄、註冊: 購物前需要 ...
  • 1.LinkedHashSet 的概述和使用 llinkedHashSet 的特點: 是唯一能保證怎麼存就怎麼輸出的 set 集合,並且去重覆 1 LinkedHashSet<String> linkSet = new LinkedHashSet<>(); 2 /* LinkedHashSet可以保 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...