前提 上次寫了篇文章,《SpringBoot Kafka 整合使用》,閱讀量還挺高的,於是想想還是把其他幾種 MQ 也和 SpringBoot 整合使用下。 下麵是四種比較流行的 MQ : 後面都寫寫和 SpringBoot 整合的文章。 安裝 RabbitMQ 由於換 Mac 了,所以一些環境就直 ...
前提
上次寫了篇文章,《SpringBoot Kafka 整合使用》,閱讀量還挺高的,於是想想還是把其他幾種 MQ 也和 SpringBoot 整合使用下。
下麵是四種比較流行的 MQ :
後面都寫寫和 SpringBoot 整合的文章。
安裝 RabbitMQ
由於換 Mac 了,所以一些環境就直接在 Mac 搞,但是像安裝 RabbitMQ 這些又會把自己電腦系統給搞的太亂,所以能在 Docker 裡面安裝就安裝在 Docker,這次 RabbitMQ 我也直接在 Docker 里安裝。
啟動 Docker for Mac,如果沒安裝過的請看我上一篇文章:http://www.54tianzhisheng.cn/...
當然你也可以在自己的 Linux 伺服器或者虛擬機里啟動安裝 RabbitMQ 。
Docker 安裝的話很簡單,因為 RabbitMQ 官方已經提供了自己的 Docker 容器,只需要一行命令:(可右移查看完整代碼)
docker run -d -p 15672:15672 -p 5672:5672 -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin --name rabbitmq rabbitmq:3-management
該鏡像擁有一個基於 web 的控制台和 Http API。Http API 可以在地址看到如何使用:http://localhost:15672/api/
講解下上面命令行:
- 15672 :表示 RabbitMQ 控制臺端口號,可以在瀏覽器中通過控制台來執行 RabbitMQ 的相關操作。
- 5672 : 表示 RabbitMQ 所監聽的 TCP 埠號,應用程式可通過該埠與 RabbitMQ 建立 TCP 連接,並完成後續的非同步消息通信
- RABBITMQDEFAULTUSER:用於設置登陸控制台的用戶名,這裡我設置 admin
- RABBITMQDEFAULTPASS:用於設置登陸控制台的密碼,這裡我設置 admin
容器啟動成功後,可以在瀏覽器輸入地址:http://localhost:15672/ 訪問控制台
登陸後:
簡單描述下上圖中中控制台的列表的作用:
- Overview :用於查看 RabbitMQ 的一些基本信息(消息隊列、消息發送速率、節點、埠和上下文信息等)
- Connections:用於查看 RabbitMQ 客戶端的連接信息
- Channels:用戶查看 RabbitMQ 的通道信息
- Exchange:用於查看 RabbitMQ 交換機
- Queues:用於查看 RabbitMQ 的隊列
- Admin:用於管理用戶,可增加用戶
創建項目
在 IDEA 中創建一個 SpringBoot 項目結構:
SpringBoot 框架中已經內置了對 RabbitMQ 的支持,如果你看過官方文檔的話,就可以看到的,我們需要把依賴 spring-boot-starter-amqp 引入就行。
1、 pom.xml 引入依賴後如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.zhisheng</groupId>
<artifactId>rabbitmq</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>rabbitmq</name>
<description>Demo project for Spring Boot RabbitMQ</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.9.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
2、application.properties 配置修改如下:
spring.rabbitmq.addresses=localhost:5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
3、消息發送類 RabbitMQClient.java
package com.zhisheng.rabbitmq.client;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/** * Created by zhisheng_tian on 2018/1/23 */
@Componentpublic
class RabbitMQClient {
@Autowired
private RabbitTemplate rabbitTemplate;
public void send(String message) { rabbitTemplate.convertAndSend("zhisheng", message); }
}
就這樣,發送消息代碼就實現了。
這裡關鍵的代碼為 rabbitTemplate.convertAndSend() 方法, zhisheng
這個是路由規則(routingKey),它的值表明將消息發送到指定的隊列 zhisheng
中去,這裡跟了下源碼,發現 convertAndSend() 方法最後調用的方法其實是一個 doSend() 方法。
4、消息接收類
package com.zhisheng.rabbitmq.server;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/** * Created by zhisheng_tian on 2018/1/23 */
@Componentpublic
class RabbitMQServer {
@RabbitListener(queues = "zhisheng")
public void receive(String message) { System.out.println("收到的 message 是:" + message); }
}
你看,這裡就有個 RabbitListener
一直在監聽著隊列 zhisheng
。
當然這個隊列是必須要我們自己在應用程式中創建好,它不會像我之前寫的文章 《SpringBoot Kafka 整合使用》 中的 Kafka 一樣,Kafka 它會在用到隊列的時候動態的創建,不需要我們提前創建好。
那麼在 RabbitMQ 中該如何創建隊列呢?
如上圖所示:這樣我們就創建好了一個 zhisheng
的隊列,當程式開始運行時,消息接收類會持續監聽隊列 zhisheng
中即將到來的消息。
5、運行項目
需要在啟動類中註入發送消息的類,並且提供 init 方法,在 init 方法中調用發送消息類的 send() 方法
@PostConstructpublic
void init() { rabbitMQClient.send("發送消息----zhisheng-----"); }
需要註意的是:init() 方法帶有 @PostConstruct 註解,被 @PostConstruct 修飾的方法會在構造函數之後執行。
啟動項目就可以發現控制台已經接收到消息了。
6、單線程測試性能
看到上面圖片中註釋掉的代碼沒?那就是用來測試消息發送的性能的,我發送 10000 條消息看看總共耗時多少。
10000 條消息發送耗時:215ms。這是在單線程下,下次可以和其他的 MQ 測試對比下,並且也可以在多線程的環境下測試性能。
同時從控制台可以看到發送的速率:
7、多線程測試性能
開了10 個線程,每個線程發送 10000 條消息。
init 方法代碼如下:
@PostConstruct
public void init() {
StopWatch stopWatch = new StopWatch(); stopWatch.start();
int threads = 10; ExecutorService executorService = Executors.newFixedThreadPool(threads);
final CountDownLatch start = new CountDownLatch(1); final CountDownLatch end = new CountDownLatch(threads);
for (int i = 0; i < threads; i++) {
executorService.execute(( ) -> {
try {
start.await(); for (int i1 = 0; i1 < 10000; i1++) {
rabbitMQClient.send("發送消息----zhisheng-----");
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
end.countDown();
}
} );
}
start.countDown(); try {
end.await();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
executorService.shutdown();
}
stopWatch.stop(); System.out.println("發送消息耗時:" + stopWatch.getTotalTimeMillis());
}
耗時:4063ms
控制台顯示如下圖:
8、註意
這裡測試發送的消息直接是 String 類型的,你也可以測試下 Bean 類,這需要註意需要序列化。
推薦閱讀:
為什麼選擇 Spring 作為 Java 框架?
SpringBoot RocketMQ 整合使用和監控
上篇好文: