[TOC] 簡介 服務之間的同步調用,可以使用 HTTP 或 RPC 來完成,但並非所有的調用都需要同步,有些場景下,當客戶端調用服務端時,並不需要等待服務端做出響應,此時就應該使用非同步調用。非同步調用的常用方式是基於 MQ (Message Queue) 來實現的。下文會以 ActiveMQ 為例進 ...
目錄
簡介
服務之間的同步調用,可以使用 HTTP 或 RPC 來完成,但並非所有的調用都需要同步,有些場景下,當客戶端調用服務端時,並不需要等待服務端做出響應,此時就應該使用非同步調用。非同步調用的常用方式是基於 MQ (Message Queue) 來實現的。下文會以 ActiveMQ 為例進行講解。
ActiveMQ 是 Java 世界中最為流行的開源消息中間件,它不僅功能強大,而且性能穩定。它可全面支持 JMS(Java 消息服務)技術規範,為 Java 應用程式提供標準的 JMS API。
此外 ActiveMQ 具備與 Spring 框架整合的能力,它一直都是 Spring 應用程式的消息中間件標配。同樣, Spring Boot 也提供了 ActiveMQ 的開箱即用的插件,只需要幾項配置,就能接入 ActiveMQ,並輕鬆使用 JMS API 編寫非同步消息通信程式。
Active MQ 官網地址如下
http://activemq.apache.org
啟動 ActiveMQ 伺服器
先使用 docker 安裝 ActiveMQ ,目前 ActiveMQ 官方並未提供相應的 Docker 鏡像,我們選擇使用第三方鏡像 webcenter/activemq
。
docker pull webcenter/activemq:5.14.3
接下來運行 ActiveMQ
docker run -d -p 8161:8161 -p 61616:61616 -e ACTIVEMQ_ADMIN_LOGIN=admin -e ACTIVEMQ_ADMIN_PASSWORD=admin --name activemq webcenter/activemq:5.14.3
在啟動 ActiveMQ 容器時,容器對宿主機暴露了兩個埠號:
- 8161: 表示 ActiveMQ 控制臺端口號,可在瀏覽器中通過控制台來執行 ActiveMQ 的相關操作
- 61616: 表示 ActiveMQ 所監聽的 TCP 埠號,應用程式可通過該埠號與 ActiveMQ 建立 TCP 連接,並完成後續的非同步消息通信
此外,在啟動 ActiveMQ 容器時,還提供了兩個環境變數
- ACTIVEMQ_ADMIN_LOGIN: 用於設置控制台管理員的用戶名,預設為 admin
- ACTIVEMQ_ADMIN_PASSWORD: 用於設置控制台管理員的密碼,預設為 admin
查看控制台
webcenter/activemq
鏡像擁有一個基於 Web 的控制台,可通過瀏覽器訪問。容器啟動完畢後,可以打開瀏覽器,併在地址欄中輸入 http://localhost:8161
點擊 Manage ActiveMQ borker 鏈接,瀏覽器將彈出一個對話框,此時輸入用戶名和密碼,認證通過後會進入管理界面
在管理界面中,包括 8 個功能菜單
- Home: 基本信息
- Queues: 管理的隊列
- Topics: 查看所管理的主題
- Subscribers: 查看相關主題的訂閱者
- Connections: 查看客戶端的連接信息
- Network: 查看網路相關信息
- Scheduled: 查看 ActiveMQ 內部運行的定時任務
- Send: 通過表單方式查看向隊列或主題發送具體消息
ActiveMQ 的消息通道
ActiveMQ 管理了兩類消息通道,一類是隊列(Queue),另一類叫做主題(Topic)。
Queue
Queue 用於解決消息的 點對點 通信問題,也就是說,消息從生產者(Producer) 發出後,首先進入 ActiveMQ 某個指定的 Queue 中,然後再將消息傳送給其中一個消費者(Consumer)。
Topic
Topic 用於解決消息的發佈與訂閱(Publish-subscribe) 通信問題,也就是說,消息從 Producer 發出後,首先將其發佈到 ActiveMQ 某個指定的 Topic 上,然後將此消息分發給每個訂閱者(Subscriber) 。
比較
在具體場合下,靈活使用以上兩種通信模式來實現 Producer 與 Consumer/Subscriber 間的非同步調用,從而解決調用方的耦合問題。可見,Queue 能解決調用緩衝問題,Topic 能解決消息廣播問題, Queue 與 Topic 都能解決掉調用耦合問題,這些技術都為一個好的軟體架構提供了有效的支撐。
開發生產者和消費者
下麵就以 Queue 為例,將 ActiveMQ 與 Spring Boot 進行整合,將 Producer 作為客戶端, Consumer 作為服務端,通過 Queue 實現客戶端與服務端的非同步調用
開發服務端(消費者)
首先創建一個名為 acitvemq-hello-server
的 spring boot 項目,如果在 eclipse 中安裝了 Spring Tools ,可以在新建時選擇 New Spring Starter Project
選項。或者新建 Maven 工程。對應的 maven 依賴如下
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.19.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
</dependencies>
在 Spring Boot 框架中已經內置了對 ActiveMQ 的支持,我們只需要依賴 spring-boot-starter-mq 就能啟動 ActiveMQ,此時還需要在 application.properties
文件中添加 ActiveMQ 配置項
spring.activemq.broker-url=tcp://10.104.10.1:61616
spring.activemq.user=admin
spring.activemq.password=admin
接下來創建 HelloServer
的類,封裝服務端相關代碼
@Component
public class HelloServer {
@JmsListener(destination="hello-queue")
public void receive(String message) {
System.out.println(message);
}
}
使用 @Component
註解,說明它可被 Spring IoC 容器所管理。此時只需要使用 @JmsListener
註解,並將其綁定到 receive()
方法上,就能從 ActiveMQ 中接收響應的消息。
@JmsListener
註解中需要添加一個 destination 屬性來指定 Queue/Topic
的名稱,該名稱具有唯一性。消息將以一個 String 類型參數的形式傳入方法體中,也可以接收其他類型的消息,這取決於客戶端發送的消息是哪種類型。Spring JMS 將消息放入 ActiveMQ 時會進行序列化,當消息從 ActiveMQ 取出時將進行反序列化,應用程式無需關註這些底層細節,只需要將精力放在業務邏輯上。
最後,編寫一個 Spring Boot 應用程式啟動類來啟動服務端(使用 spring tools 工具會自動生成)
@SpringBootApplication
public class ActivemqHelloServerApplication {
public static void main(String[] args) {
SpringApplication.run(ActivemqHelloServerApplication.class, args);
}
}
當服務端啟動完畢後,將一直監聽 ActiveMQ 的 hello-queue 隊列中即將到來的消息,消息由客戶端來發送。
開發客戶端(生產者)
創建一個名為 active-mq-client
的 Maven 項目, pom.xml 文件內容與服務端相似。application.properties 文件與服務端相同。
接下來創建一個名為 HelloClient
的類,將其作為客戶端。
@Component
public class HelloClient {
@Autowired
private JmsTemplate jmsTemplate;
public void send(String message) {
jmsTemplate.convertAndSend("hello-queue", message);
}
}
這裡使用了 @Autowired
註解, JmsTemplate 對象註入進來,還編寫了一個 send()
方法,在該方法中調用 JmsTemplate 對象的 convertAndSend
來轉換併發送消息。
最後使用 Spring Boot 應用程式啟動類來啟動客戶端
@SpringBootApplication
public class ActivemqHelloClientApplication {
@Autowired
private HelloClient helloClient;
@PostConstruct
public void init() {
helloClient.send("hello world");
}
public static void main(String[] args) {
SpringApplication.run(ActivemqHelloClientApplication.class, args);
}
}
需要註意的是, init()
方法帶有 @PostConstruct
註解,表示 Spring IoC 容器實例化 ActivemqHelloClientApplication
類後將調用該方法。
運行 main() 方法可以啟動客戶端應用程式,並可以在服務端應用程式控制臺中看到 client 發送的消息,也可以在 ActiveMQ 控制臺中查看隊列的當前狀態
Queue 表格中列明的含義如下
- Name 表示隊列名稱,可在應用程式中自動創建,也可在 ActiveMQ 控制臺中手動創建
- Number Of Pending Messages 表示阻塞在隊列中未經消費的消息條數
- Number Of Consumers 表示正在與 ActiveMQ 建立連接的消費者數量
- Messages Enqueued 表示進入隊列的消息數量
- Messages Dequeued 表示離開隊列的消息數量
此外,還有下麵幾種操作
- Browser 用於查看當前隊列中消息的相關細節
- Active Consumers 用於查看當前活動消費者的相關信息
- Active Producers 用於查看當前活動生產者的相關信息
- Send To 用於向當前隊列中發送具體消息
- Purge 用於清空隊列中的消息
- Delete 用於刪除當前隊列
參考
- 《架構探險—輕量級微服務架構》