Micronaut 微服務中使用 Kafka

来源:https://www.cnblogs.com/springforall/archive/2019/09/30/11610643.html
-Advertisement-
Play Games

今天,我們將通過 topic構建一些彼此非同步通信的微服務。我們使用 框架,它為與 集成提供專門的庫。讓我們簡要介紹一下示例系統的架構。我們有四個微型服務: ,`行程服務 司機服務 乘客服務 Kafka`實例。 我們系統的主要目標是為客戶安排行程。訂單服務應用程式還充當網關。它接收來自客戶的請求,保存 ...


今天,我們將通過Apache Kafkatopic構建一些彼此非同步通信的微服務。我們使用Micronaut框架,它為與Kafka集成提供專門的庫。讓我們簡要介紹一下示例系統的架構。我們有四個微型服務:訂單服務行程服務司機服務乘客服務。這些應用程式的實現非常簡單。它們都有記憶體存儲,並連接到同一個Kafka實例。

我們系統的主要目標是為客戶安排行程。訂單服務應用程式還充當網關。它接收來自客戶的請求,保存歷史記錄並將事件發送到orderstopic。所有其他微服務都在監聽orders這個topic,並處理order-service發送的訂單。每個微服務都有自己的專用topic,其中發送包含更改信息的事件。此類事件由其他一些微服務接收。架構如下圖所示。

img

在閱讀本文之前,有必要熟悉一下Micronaut框架。您可以閱讀之前的一篇文章,該文章描述了通過REST API構建微服務通信的過程:使用microaut框架構建微服務的快速指南

1. 運行Kafka

要在本地機器上運行Apache Kafka,我們可以使用它的Docker映像。最新的鏡像是由https://hub.docker.com/u/wurstmeister共用的。在啟動Kafka容器之前,我們必須啟動kafka所用使用的ZooKeeper伺服器。如果在Windows上運行Docker,其虛擬機的預設地址是192.168.99.100。它還必須設置為Kafka容器的環境。

ZookeeperKafka容器都將在同一個網路中啟動。在docker中運行Zookeeper以zookeeper的名稱提供服務,併在暴露2181埠。Kafka容器需要在環境變數使用KAFKA_ZOOKEEPER_CONNECT的地址。

$ docker network create kafka
$ docker run -d --name zookeeper --network kafka -p 2181:2181 wurstmeister/zookeeper
$ docker run -d --name kafka -p 9092:9092 --network kafka --env KAFKA_ADVERTISED_HOST_NAME=192.168.99.100 --env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 wurstmeister/kafka

2. 引入Micronaut Kafka依賴

使用Kafka構建的microaut應用程式可以在HTTP伺服器存在的情況下啟動,也可以在不存在HTTP伺服器的情況下啟動。要啟用Micronaut Kafka,需要添加micronaut-kafka庫到依賴項。如果您想暴露HTTP API,您還應該添加micronaut-http-server-netty:

<dependency>
    <groupId>io.micronaut.configuration</groupId>
    <artifactId>micronaut-kafka</artifactId>
</dependency>
<dependency>
    <groupId>io.micronaut</groupId>
    <artifactId>micronaut-http-server-netty</artifactId>
</dependency>

3. 構建訂單微服務

訂單微服務是唯一一個啟動嵌入式HTTP伺服器並暴露REST API的應用程式。這就是為什麼我們可以為Kafka提供內置Micronaut健康檢查。要做到這一點,我們首先應該添加micronaut-management依賴:

<dependency>
    <groupId>io.micronaut</groupId>
    <artifactId>micronaut-management</artifactId>
</dependency>

為了方便起見,我們將通過在application.yml中定義以下配置來啟用所有管理端點並禁用它們的HTTP身份驗證。

endpoints:
  all:
    enabled: true
    sensitive: false

現在,可以在地址http://localhost:8080/health下使用health check。我們的示例應用程式還將暴露添加新訂單列出所有以前創建的訂單的簡單REST API。下麵是暴露這些端點的Micronaut控制器實現:

@Controller("orders")
public class OrderController {

    @Inject
    OrderInMemoryRepository repository;
    @Inject
    OrderClient client;

    @Post
    public Order add(@Body Order order) {
        order = repository.add(order);
        client.send(order);
        return order;
    }

    @Get
    public Set<Order> findAll() {
        return repository.findAll();
    }

}

每個微服務都使用記憶體存儲庫實現。以下是訂單微服務(Order-Service)中的存儲庫實現:

@Singleton
public class OrderInMemoryRepository {

    private Set<Order> orders = new HashSet<>();

    public Order add(Order order) {
        order.setId((long) (orders.size() + 1));
        orders.add(order);
        return order;
    }

    public void update(Order order) {
        orders.remove(order);
        orders.add(order);
    }

    public Optional<Order> findByTripIdAndType(Long tripId, OrderType type) {
        return orders.stream().filter(order -> order.getTripId().equals(tripId) && order.getType() == type).findAny();
    }

    public Optional<Order> findNewestByUserIdAndType(Long userId, OrderType type) {
        return orders.stream().filter(order -> order.getUserId().equals(userId) && order.getType() == type)
                .max(Comparator.comparing(Order::getId));
    }

    public Set<Order> findAll() {
        return orders;
    }

}

記憶體存儲庫存儲Order對象實例。Order對象還被髮送到名為orders的Kafkatopic。下麵是Order類的實現:

public class Order {

    private Long id;
    private LocalDateTime createdAt;
    private OrderType type;
    private Long userId;
    private Long tripId;
    private float currentLocationX;
    private float currentLocationY;
    private OrderStatus status;

    // ... GETTERS AND SETTERS
}

4. 使用Kafka非同步通信

現在,讓我們想一個可以通過示例系統實現的用例——添加新的行程

我們創建了OrderType.NEW_TRIP類型的新訂單。在此之後,(1)訂單服務創建一個訂單並將其發送到orderstopic。訂單由三個微服務接收:司機服務乘客服務行程服務
(2)所有這些應用程式都處理這個新訂單。乘客服務應用程式檢查乘客帳戶上是否有足夠的資金。如果沒有,它就取消了行程,否則什麼也做不了。司機服務正在尋找最近可用的司機,(3)行程服務創建和存儲新的行程。司機服務行程服務都將事件發送到它們的topic(drivers, trips),其中包含相關更改的信息。

每一個事件可以被其他microservices訪問,例如,(4)行程服務偵聽來自司機服務的事件,以便為行程分配一個新的司機

下圖說明瞭在添加新的行程時,我們的微服務之間的通信過程。
在這裡插入圖片描述現在,讓我們繼續討論實現細節。

4.1. 發送訂單

首先,我們需要創建Kafka 客戶端,負責向topic發送消息。我們創建的一個介面,命名為OrderClient,為它添加@KafkaClient並聲明用於發送消息的一個或多個方法。每個方法都應該通過@Topic註解設置目標topic名稱。對於方法參數,我們可以使用三個註解@KafkaKey@Body@Header@KafkaKey用於分區,這是我們的示例應用程式所需要的。在下麵可用的客戶端實現中,我們只使用@Body註解。

@KafkaClient
public interface OrderClient {

    @Topic("orders")
    void send(@Body Order order);

}

4.2. 接收訂單

一旦客戶端發送了一個訂單,它就會被監聽orderstopic的所有其他微服務接收。下麵是司機服務中的監聽器實現。監聽器類OrderListener應該添加@KafkaListener註解。我們可以聲明groupId作為一個註解參數,以防止單個應用程式的多個實例接收相同的消息。然後,我們聲明用於處理傳入消息的方法。與客戶端方法相同,應該通過@Topic註解設置目標topic名稱,因為我們正在監聽Order對象,所以應該使用@Body註解——與對應的客戶端方法相同。

@KafkaListener(groupId = "driver")
public class OrderListener {

    private static final Logger LOGGER = LoggerFactory.getLogger(OrderListener.class);

    private DriverService service;

    public OrderListener(DriverService service) {
        this.service = service;
    }

    @Topic("orders")
    public void receive(@Body Order order) {
        LOGGER.info("Received: {}", order);
        switch (order.getType()) {
            case NEW_TRIP -> service.processNewTripOrder(order);
        }
    }

}

4.3. 發送到其他topic

現在,讓我們看一下司機服務中的processNewTripOrder方法。DriverService註入兩個不同的Kafka Client
bean: OrderClientDriverClient。當處理新訂單時,它將試圖尋找與發送訂單的乘客最近的司機。找到他之後,將該司機的狀態更改為UNAVAILABLE,並將帶有Driver對象的事件發送到driverstopic。

@Singleton
public class DriverService {

    private static final Logger LOGGER = LoggerFactory.getLogger(DriverService.class);

    private DriverClient client;
    private OrderClient orderClient;
    private DriverInMemoryRepository repository;

    public DriverService(DriverClient client, OrderClient orderClient, DriverInMemoryRepository repository) {
        this.client = client;
        this.orderClient = orderClient;
        this.repository = repository;
    }

    public void processNewTripOrder(Order order) {
        LOGGER.info("Processing: {}", order);
        Optional<Driver> driver = repository.findNearestDriver(order.getCurrentLocationX(), order.getCurrentLocationY());
        driver.ifPresent(driverLocal -> {
            driverLocal.setStatus(DriverStatus.UNAVAILABLE);
            repository.updateDriver(driverLocal);
            client.send(driverLocal, String.valueOf(order.getId()));
            LOGGER.info("Message sent: {}", driverLocal);
        });
    }

    // ...
}

這是Kafka Client司機服務中的實現,用於向drivertopic發送消息。因為我們需要將DriverOrder 關聯起來,所以我們使用@Header註解 的orderId參數。沒有必要把它包括到Driver類中,將其分配給監聽器端的正確行程。

@KafkaClient
public interface DriverClient {

    @Topic("drivers")
    void send(@Body Driver driver, @Header("Order-Id") String orderId);

}

4.4. 服務間通信

DriverListener收到@KafkaListener行程服務中聲明。它監聽傳入到triptopic。接收方法的參數和客戶端發送方法的類似,如下所示:

@KafkaListener(groupId = "trip")
public class DriverListener {

    private static final Logger LOGGER = LoggerFactory.getLogger(OrderListener.class);

    private TripService service;

    public DriverListener(TripService service) {
        this.service = service;
    }

    @Topic("drivers")
    public void receive(@Body Driver driver, @Header("Order-Id") String orderId) {
        LOGGER.info("Received: driver->{}, header->{}", driver, orderId);
        service.processNewDriver(driver);
    }

}

最後一步,將orderId查詢到的行程TripdriverId關聯,這樣整個流程就結束。

@Singleton
public class TripService {

    private static final Logger LOGGER = LoggerFactory.getLogger(TripService.class);

    private TripInMemoryRepository repository;
    private TripClient client;

    public TripService(TripInMemoryRepository repository, TripClient client) {
        this.repository = repository;
        this.client = client;
    }


    public void processNewDriver(Driver driver, String orderId) {
        LOGGER.info("Processing: {}", driver);
        Optional<Trip> trip = repository.findByOrderId(Long.valueOf(orderId));
        trip.ifPresent(tripLocal -> {
            tripLocal.setDriverId(driver.getId());
            repository.update(tripLocal);
        });
    }

    // ... OTHER METHODS

}

5. 跟蹤

我們可以使用Micronaut Kafka輕鬆地啟用分散式跟蹤。首先,我們需要啟用和配置Micronaut跟蹤。要做到這一點,首先應該添加一些依賴項:

<dependency>
    <groupId>io.micronaut</groupId>
    <artifactId>micronaut-tracing</artifactId>
</dependency>
<dependency>
    <groupId>io.zipkin.brave</groupId>
    <artifactId>brave-instrumentation-http</artifactId>
    <scope>runtime</scope>
</dependency>
<dependency>
    <groupId>io.zipkin.reporter2</groupId>
    <artifactId>zipkin-reporter</artifactId>
    <scope>runtime</scope>
</dependency>
<dependency>
    <groupId>io.opentracing.brave</groupId>
    <artifactId>brave-opentracing</artifactId>
</dependency>
<dependency>
    <groupId>io.opentracing.contrib</groupId>
    <artifactId>opentracing-kafka-client</artifactId>
    <version>0.0.16</version>
    <scope>runtime</scope>
</dependency>

我們還需要在application.yml配置文件中,配置Zipkin 的追蹤的地址等。

tracing:
  zipkin:
    enabled: true
    http:
      url: http://192.168.99.100:9411
    sampler:
      probability: 1

在啟動應用程式之前,我們必須運行Zipkin容器:

$ docker run -d --name zipkin -p 9411:9411 openzipkin/zipkin

6. 總結

在本文中,您將瞭解通過Apache Kafka使用非同步通信構建微服務架構的過程。我已經向大家展示了Microaut Kafka庫最重要的特性,它允許您輕鬆地聲明Kafkatopic的生產者和消費者,為您的微服務啟用健康檢查分散式跟蹤。我已經為我們的系統描述了一個簡單的場景的實現,包括根據客戶的請求添加一個新的行程。本示例系統的整體實現,請查看GitHub上的源代碼

原文鏈接:https://piotrminkowski.wordpress.com/2019/08/06/kafka-in-microservices-with-micronaut/

作者:Piotr's

譯者:李東


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 單例模式(singleton):是JAVA中最簡單的一種設計模式,屬於創建型模式。所謂單例,就是整個程式有且僅有一個實例。 特點: 構造方法私有化 在本類中實例化一個對象作為本類的屬性 對外提供一個訪問本類對象的方法 餓漢式:類載入時就載入對象 應用場景:小對象,頻繁用,高併發 特點:線程安全,比較 ...
  • 一 會話跟蹤 我們需要先瞭解一下什麼是會話!可以把會話理解為客戶端與伺服器之間的一次會晤,在一次會晤中可能會包含多次請求和響應。例如你給10086打個電話,你就是客戶端,而10086服務人員就是伺服器了。從雙方接通電話那一刻起,會話就開始了,到某一方掛斷電話表示會話結束。在通話過程中,你會向1008 ...
  • 1.面向對象的思想優點 優點: 簡化代碼 ,構建公共模板 ,擴展性強 思想: 類作為模板 ,對象通過模板實例化對象 ,對象去做事 ,抽象將顯示存在的事物使用代碼體現 2.三大特性 封裝(狹義) : 對屬性的封裝 ,對方法的封裝 繼承 1.子類擁有父類除私有內容外所有 2.對象通過子類-父類-...找 ...
  • 組合模式(Composite): 定義: 組合模式又叫部分整體模式,它是一種將對象組合成樹狀的層次結構模式,用來表示"部分-整體"的關係,使用戶對單個對象和組合對象具有一致的訪問性。 組合模式的角色: 1)抽象構建(Component):它的主要作用是為樹葉構件和樹枝構件聲明公共介面,並實現它們的默 ...
  • 作為程式員,你肯定遇到過NullPointerException, 這個異常對於初出茅廬的新人, 還是久經江湖的老手都是不可避免的痛, 可又是那麼的無能為力,為瞭解決它,你只能在使用某個值之前,對其進行判空處理。然而這樣會使得代碼變得臃腫不堪。幸好jdk8引入了optional來處理了null的問題 ...
  • 創建一個測試用的微服務項目HelloWorld 創建項目 編寫服務代碼 編輯配置文件application.properties: 測試運行 源碼 "Github倉庫:https://github.com/sunweisheng/spring cloud example" ...
  • 目錄: 一.字元編碼 二.字元串格式化 三.進位轉換 四.數據類型及其操作 五.字元串轉換 六.列表 七.元組 八.字典 一.字元編碼: 電腦由美國人發明,最早的字元編碼為ASCII,只規定了英文字母數字和一些特殊字元與數字的對應關係。最多只能用 8 位來表示(一個位元組),即:2**8 = 256 ...
  • 樹的基本概念和常用術語 節點的度:一個結點的兒子結點個數稱為該節點的度 樹的度:一棵樹的度是指該樹中結點的最大度數。如上圖的樹的度是3 葉節點或終端節點:度為零的節點。如上圖中E,I,J,C,G,H是葉節點 非終端節點或分支節點:度不為零的節點。除根節點外的分支節點都叫做內部節點。 路徑:若存在樹中 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...