今天,我們將通過 topic構建一些彼此非同步通信的微服務。我們使用 框架,它為與 集成提供專門的庫。讓我們簡要介紹一下示例系統的架構。我們有四個微型服務: ,`行程服務 司機服務 乘客服務 Kafka`實例。 我們系統的主要目標是為客戶安排行程。訂單服務應用程式還充當網關。它接收來自客戶的請求,保存 ...
今天,我們將通過Apache Kafka
topic構建一些彼此非同步通信的微服務。我們使用Micronaut
框架,它為與Kafka
集成提供專門的庫。讓我們簡要介紹一下示例系統的架構。我們有四個微型服務:訂單服務
,行程服務
,司機服務
和乘客服務
。這些應用程式的實現非常簡單。它們都有記憶體存儲,並連接到同一個Kafka
實例。
我們系統的主要目標是為客戶安排行程。訂單服務應用程式還充當網關。它接收來自客戶的請求,保存歷史記錄並將事件發送到orders
topic。所有其他微服務都在監聽orders
這個topic,並處理order-service
發送的訂單。每個微服務都有自己的專用topic,其中發送包含更改信息的事件。此類事件由其他一些微服務接收。架構如下圖所示。
在閱讀本文之前,有必要熟悉一下Micronaut
框架。您可以閱讀之前的一篇文章,該文章描述了通過REST API構建微服務通信的過程
:使用microaut框架構建微服務的快速指南。
1. 運行Kafka
要在本地機器上運行Apache Kafka
,我們可以使用它的Docker映像。最新的鏡像是由https://hub.docker.com/u/wurstmeister共用的。在啟動Kafka
容器之前,我們必須啟動kafka
所用使用的ZooKeeper
伺服器。如果在Windows
上運行Docker
,其虛擬機的預設地址是192.168.99.100
。它還必須設置為Kafka
容器的環境。
Zookeeper
和Kafka
容器都將在同一個網路中啟動。在docker中運行Zookeeper以zookeeper
的名稱提供服務,併在暴露2181
埠。Kafka
容器需要在環境變數使用KAFKA_ZOOKEEPER_CONNECT
的地址。
$ docker network create kafka
$ docker run -d --name zookeeper --network kafka -p 2181:2181 wurstmeister/zookeeper
$ docker run -d --name kafka -p 9092:9092 --network kafka --env KAFKA_ADVERTISED_HOST_NAME=192.168.99.100 --env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 wurstmeister/kafka
2. 引入Micronaut Kafka依賴
使用Kafka
構建的microaut
應用程式可以在HTTP伺服器存在的情況下啟動,也可以在不存在HTTP伺服器的情況下啟動。要啟用Micronaut Kafka
,需要添加micronaut-kafka
庫到依賴項。如果您想暴露HTTP API
,您還應該添加micronaut-http-server-netty
:
<dependency>
<groupId>io.micronaut.configuration</groupId>
<artifactId>micronaut-kafka</artifactId>
</dependency>
<dependency>
<groupId>io.micronaut</groupId>
<artifactId>micronaut-http-server-netty</artifactId>
</dependency>
3. 構建訂單微服務
訂單微服務
是唯一一個啟動嵌入式HTTP伺服器並暴露REST API
的應用程式。這就是為什麼我們可以為Kafka
提供內置Micronaut
健康檢查。要做到這一點,我們首先應該添加micronaut-management
依賴:
<dependency>
<groupId>io.micronaut</groupId>
<artifactId>micronaut-management</artifactId>
</dependency>
為了方便起見,我們將通過在application.yml
中定義以下配置來啟用所有管理端點並禁用它們的HTTP身份驗證。
endpoints:
all:
enabled: true
sensitive: false
現在,可以在地址http://localhost:8080/health下使用health check
。我們的示例應用程式還將暴露添加新訂單
和列出所有以前創建的訂單
的簡單REST API
。下麵是暴露這些端點的Micronaut
控制器實現:
@Controller("orders")
public class OrderController {
@Inject
OrderInMemoryRepository repository;
@Inject
OrderClient client;
@Post
public Order add(@Body Order order) {
order = repository.add(order);
client.send(order);
return order;
}
@Get
public Set<Order> findAll() {
return repository.findAll();
}
}
每個微服務都使用記憶體存儲庫實現。以下是訂單微服務(Order-Service)
中的存儲庫實現:
@Singleton
public class OrderInMemoryRepository {
private Set<Order> orders = new HashSet<>();
public Order add(Order order) {
order.setId((long) (orders.size() + 1));
orders.add(order);
return order;
}
public void update(Order order) {
orders.remove(order);
orders.add(order);
}
public Optional<Order> findByTripIdAndType(Long tripId, OrderType type) {
return orders.stream().filter(order -> order.getTripId().equals(tripId) && order.getType() == type).findAny();
}
public Optional<Order> findNewestByUserIdAndType(Long userId, OrderType type) {
return orders.stream().filter(order -> order.getUserId().equals(userId) && order.getType() == type)
.max(Comparator.comparing(Order::getId));
}
public Set<Order> findAll() {
return orders;
}
}
記憶體存儲庫存儲Order
對象實例。Order
對象還被髮送到名為orders
的Kafkatopic。下麵是Order
類的實現:
public class Order {
private Long id;
private LocalDateTime createdAt;
private OrderType type;
private Long userId;
private Long tripId;
private float currentLocationX;
private float currentLocationY;
private OrderStatus status;
// ... GETTERS AND SETTERS
}
4. 使用Kafka非同步通信
現在,讓我們想一個可以通過示例系統實現的用例——添加新的行程
。
我們創建了OrderType.NEW_TRIP
類型的新訂單。在此之後,(1)訂單服務
創建一個訂單並將其發送到orders
topic。訂單由三個微服務接收:司機服務
、乘客服務
和行程服務
。
(2)所有這些應用程式都處理這個新訂單。乘客服務
應用程式檢查乘客帳戶上是否有足夠的資金。如果沒有,它就取消了行程,否則什麼也做不了。司機服務
正在尋找最近可用的司機,(3)行程服務
創建和存儲新的行程。司機服務
和行程服務
都將事件發送到它們的topic(drivers
, trips
),其中包含相關更改的信息。
每一個事件可以被其他microservices
訪問,例如,(4)行程服務
偵聽來自司機服務
的事件,以便為行程分配一個新的司機
下圖說明瞭在添加新的行程時,我們的微服務之間的通信過程。
現在,讓我們繼續討論實現細節。
4.1. 發送訂單
首先,我們需要創建Kafka 客戶端,負責向topic發送消息。我們創建的一個介面,命名為OrderClient
,為它添加@KafkaClient
並聲明用於發送消息的一個或多個方法。每個方法都應該通過@Topic
註解設置目標topic名稱。對於方法參數,我們可以使用三個註解@KafkaKey
、@Body
或@Header
。@KafkaKey
用於分區,這是我們的示例應用程式所需要的。在下麵可用的客戶端實現中,我們只使用@Body
註解。
@KafkaClient
public interface OrderClient {
@Topic("orders")
void send(@Body Order order);
}
4.2. 接收訂單
一旦客戶端發送了一個訂單,它就會被監聽orders
topic的所有其他微服務接收。下麵是司機服務
中的監聽器實現。監聽器類OrderListener
應該添加@KafkaListener
註解。我們可以聲明groupId
作為一個註解參數,以防止單個應用程式的多個實例接收相同的消息。然後,我們聲明用於處理傳入消息的方法。與客戶端方法相同,應該通過@Topic
註解設置目標topic名稱,因為我們正在監聽Order
對象,所以應該使用@Body
註解——與對應的客戶端方法相同。
@KafkaListener(groupId = "driver")
public class OrderListener {
private static final Logger LOGGER = LoggerFactory.getLogger(OrderListener.class);
private DriverService service;
public OrderListener(DriverService service) {
this.service = service;
}
@Topic("orders")
public void receive(@Body Order order) {
LOGGER.info("Received: {}", order);
switch (order.getType()) {
case NEW_TRIP -> service.processNewTripOrder(order);
}
}
}
4.3. 發送到其他topic
現在,讓我們看一下司機服務
中的processNewTripOrder
方法。DriverService
註入兩個不同的Kafka Client
bean: OrderClient
和DriverClient
。當處理新訂單時,它將試圖尋找與發送訂單的乘客最近的司機。找到他之後,將該司機的狀態更改為UNAVAILABLE
,並將帶有Driver
對象的事件發送到drivers
topic。
@Singleton
public class DriverService {
private static final Logger LOGGER = LoggerFactory.getLogger(DriverService.class);
private DriverClient client;
private OrderClient orderClient;
private DriverInMemoryRepository repository;
public DriverService(DriverClient client, OrderClient orderClient, DriverInMemoryRepository repository) {
this.client = client;
this.orderClient = orderClient;
this.repository = repository;
}
public void processNewTripOrder(Order order) {
LOGGER.info("Processing: {}", order);
Optional<Driver> driver = repository.findNearestDriver(order.getCurrentLocationX(), order.getCurrentLocationY());
driver.ifPresent(driverLocal -> {
driverLocal.setStatus(DriverStatus.UNAVAILABLE);
repository.updateDriver(driverLocal);
client.send(driverLocal, String.valueOf(order.getId()));
LOGGER.info("Message sent: {}", driverLocal);
});
}
// ...
}
這是Kafka Client
在司機服務
中的實現,用於向driver
topic發送消息。因為我們需要將Driver
與Order
關聯起來,所以我們使用@Header
註解 的orderId
參數。沒有必要把它包括到Driver
類中,將其分配給監聽器端的正確行程。
@KafkaClient
public interface DriverClient {
@Topic("drivers")
void send(@Body Driver driver, @Header("Order-Id") String orderId);
}
4.4. 服務間通信
由DriverListener
收到@KafkaListener
在行程服務
中聲明。它監聽傳入到trip
topic。接收方法的參數和客戶端發送方法的類似,如下所示:
@KafkaListener(groupId = "trip")
public class DriverListener {
private static final Logger LOGGER = LoggerFactory.getLogger(OrderListener.class);
private TripService service;
public DriverListener(TripService service) {
this.service = service;
}
@Topic("drivers")
public void receive(@Body Driver driver, @Header("Order-Id") String orderId) {
LOGGER.info("Received: driver->{}, header->{}", driver, orderId);
service.processNewDriver(driver);
}
}
最後一步,將orderId
查詢到的行程Trip
與driverId
關聯,這樣整個流程就結束。
@Singleton
public class TripService {
private static final Logger LOGGER = LoggerFactory.getLogger(TripService.class);
private TripInMemoryRepository repository;
private TripClient client;
public TripService(TripInMemoryRepository repository, TripClient client) {
this.repository = repository;
this.client = client;
}
public void processNewDriver(Driver driver, String orderId) {
LOGGER.info("Processing: {}", driver);
Optional<Trip> trip = repository.findByOrderId(Long.valueOf(orderId));
trip.ifPresent(tripLocal -> {
tripLocal.setDriverId(driver.getId());
repository.update(tripLocal);
});
}
// ... OTHER METHODS
}
5. 跟蹤
我們可以使用Micronaut Kafka
輕鬆地啟用分散式跟蹤。首先,我們需要啟用和配置Micronaut
跟蹤。要做到這一點,首先應該添加一些依賴項:
<dependency>
<groupId>io.micronaut</groupId>
<artifactId>micronaut-tracing</artifactId>
</dependency>
<dependency>
<groupId>io.zipkin.brave</groupId>
<artifactId>brave-instrumentation-http</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>io.zipkin.reporter2</groupId>
<artifactId>zipkin-reporter</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>io.opentracing.brave</groupId>
<artifactId>brave-opentracing</artifactId>
</dependency>
<dependency>
<groupId>io.opentracing.contrib</groupId>
<artifactId>opentracing-kafka-client</artifactId>
<version>0.0.16</version>
<scope>runtime</scope>
</dependency>
我們還需要在application.yml
配置文件中,配置Zipkin 的追蹤的地址等。
tracing:
zipkin:
enabled: true
http:
url: http://192.168.99.100:9411
sampler:
probability: 1
在啟動應用程式之前,我們必須運行Zipkin
容器:
$ docker run -d --name zipkin -p 9411:9411 openzipkin/zipkin
6. 總結
在本文中,您將瞭解通過Apache Kafka
使用非同步通信構建微服務架構的過程。我已經向大家展示了Microaut Kafka
庫最重要的特性,它允許您輕鬆地聲明Kafka
topic的生產者和消費者,為您的微服務啟用健康檢查
和分散式跟蹤
。我已經為我們的系統描述了一個簡單的場景的實現,包括根據客戶的請求添加一個新的行程。本示例系統的整體實現,請查看GitHub上的源代碼
原文鏈接:https://piotrminkowski.wordpress.com/2019/08/06/kafka-in-microservices-with-micronaut/
作者:Piotr's
譯者:李東