@ 一、前言 我們先來聊聊消息中間件: 消息中間件利用高效可靠的消息傳遞機制進行平臺無關的數據交流,並基於數據通信來進行分散式系統的集成。通過提供消息傳遞和消息排隊模型,它可以在分散式環境下擴展進程間的通信。(來自百度百科) 我們常見的中間件其實有很多種了,例如ActiveMQ、RabbitMQ、R ...
@
目錄- 一、前言
- 二、RabbitMQ作用
- 三、RabbitMQ概念
- 四、JMS與AMQP比較
- 五、RabbitMQ運行機制
- 六、Docker安裝RabbitMQ
- 七、整合Springboot
- 八、測試創建交換機、隊列、綁定關係
- 九、測試發消息
- 十、測試收消息
- 十一、總結
一、前言
我們先來聊聊消息中間件:
消息中間件利用高效可靠的消息傳遞機制
進行平臺無關的數據交流,並基於數據通信來進行分散式系統的集成。通過提供消息傳遞和消息排隊模型,它可以在分散式環境下擴展進程間的通信
。(來自百度百科)
我們常見的中間件其實有很多種了,例如ActiveMQ、RabbitMQ、RocketMQ、Kafka、ZeroMQ等,其中應用最為廣泛的要數RabbitMQ、RocketMQ、Kafka 這三款。Redis在某種程度上也可以使用list或者Stream來實現消息隊列,但不能算是中間件哈!
如果大家對怎麼選型感興趣,可以看一下小編的這篇文章:四大MQ選型
今天小編帶著大家一起學習一下RabbitMQ,從入門到精通,從無到有!!小編沒有使用Windows安裝,很麻煩,所以使用Docker安裝。如果沒有安裝Docker的可以看一下小編的另一篇文章:Linux安裝Docker
小編其實也是通過雷神的課件和講解後,自己在整理一下,供以後學習和參考,在此感謝尚矽谷雷神哈!
小編覺得在說概念之前,應該知道他的作用,然後再系統的學習概念等!
二、RabbitMQ作用
其實作用還是挺多的,但是主要是以下三條:
- 非同步處理
- 應用解耦
- 流量控制
下麵我們進行一個個的簡單描述一下哈,我們還是拿被用了一萬次的例子和圖例哈!!
1. 非同步處理
用戶在某網站註冊成功後,需要向用戶發送郵件和信息提示其註冊成功(其實沒什麼必要,但是例子說一下還可以,小編自己的理解哈!)。常規的做法是:後臺將註冊信息保存到資料庫,然後再給用戶發郵件發簡訊。
我們看到這樣非常的耗時,其實保存完成後,就可以登錄了,簡訊和郵件過一會接收也是沒有什麼問題的!或者發送失敗,用戶一直沒有收到,這都是沒什麼問題的,用戶已經登錄進去了,管你發不發簡訊,大家說對吧!!
既然存在問題,我們就是有消息隊列來解決這個問題:
我們可以在將註冊信息保存資料庫之後,把要發送註冊郵件和發送簡訊的消息寫入消息隊列,然後就告知用戶註冊成功。發送郵件和簡訊將由訂閱了消息的應用非同步的去執行。這樣耗時的問題就解決了!!
2. 應用解耦
在大型電商項目中,會將訂單系統和庫存系統分成兩個不同的應用。然後進行服務與服務之間的調用,正常情況下用戶下單後訂單系統會調用庫存系統,然後返回給用戶顯示下單成功。
但是也存在問題,如果庫存系統掛了,這樣就會導致下單失敗;如果你是用戶,你會判斷這個產品不行,以後不用了!!
彆著急,這位用戶,我們幫您解決哈:這時我們引入消息隊列進行解耦
現在有的同學會問,怎麼解決的呢?
彆著急,小編來和你說一下哈!剛剛出錯的原因就是庫存系統掛了,改處理的請求沒有處理,所以下單失敗;我們引入消息隊列,就是把訂單消息寫入到消息隊列中,然後庫存系統訂閱我們的消息隊列;然後庫存系統去消息隊列中獲取消息,進行處理訂單,來完成減庫存的操作;如果失敗也會有重試機制
,真的掛了,也可以持久化
,等到庫存系統活了之後繼續處理!!一個宗旨,不能影響用戶的使用體驗呢!!
3. 流量控制
看名字就能知道,肯定是併發很大的情況才會出現的,不用想就是秒殺時刻
了!
假設一瓶茅臺2萬人搶,這是我們的系統可能會被打垮。所以我們把超過一定併發量
時,把超過的請求放在消息隊列中,然後減緩系統壓力
,然後慢慢處理;雖然可能降低一下用戶的體驗,但是秒殺就是這樣,只能有一部分人成功,我們要保證好系統可以正常運行哈!!
三、RabbitMQ概念
1. RabbitMQ簡介
RabbitMQ是一個由erlang開發的AMQP
(Advanved Message Queue Protocol)的開源實現。
RabbitMQ 是部署最廣泛的開源消息代理。
RabbitMQ擁有數萬用戶,是最流行的開源消息代理之一。從T-Mobile到Runtastic,RabbitMQ在世界各地的小型初創公司和大型企業中使用。
RabbitMQ是輕量級的,易於在本地和雲中部署。它支持多種消息傳遞協議。RabbitMQ可以在分散式和聯合配置中部署,以滿足高規模、高可用性需求。
RabbitMQ運行在許多操作系統和雲環境上,併為最流行的語言提供了廣泛的開發工具。
2. 核心概念
Message
消息,消息是不具名的,它由消息頭和消息體組成。消息體是不透明的,而消息頭則由一系列的可選屬性組成,
這些屬性包括routing-key
(路由鍵)、priority(相對於其他消息的優先權)、delivery-mode(指出該消息可
能需要持久性存儲)等。
Publisher
消息的生產者,也是一個向交換器發佈消息的客戶端應用程式。
Exchange
交換器,用來接收生產者發送的消息並將這些消息路由給伺服器中的隊列。
Exchange有4種類型:direct(預設),fanout, topic, 和headers,不同類型的Exchange轉發消息的策略有所區別Queue
消息隊列,用來保存消息直到發送給消費者。它是消息的容器,也是消息的終點
。一個消息可投入一個或多個隊列。消息一直在隊列裡面,等待消費者連接到這個隊列將其取走。
Binding
綁定,用於消息隊列和交換器之間的關聯。一個綁定就是基於路由鍵將交換器和消息隊列連接起來的路由規則,所以可以將交換器理解成一個由綁定構成的路由表。
Exchange 和Queue的綁定可以是多對多的關係
。
Connection
網路連接,比如一個TCP連接。
Channel
通道,多路復用連接中的一條獨立的雙向數據流通道
。通道是建立在真實的TCP連接內的虛擬連接,AMQP 命令都是通過通道發出去的,不管是發佈消息、訂閱隊列還是接收消息,這些動作都是通過通道完成。因為對於操作系統來說建立和銷毀 TCP 都是非常昂貴的開銷,所以引入了通道的概念,以復用一條 TCP 連接。
Consumer
消息的消費者,表示一個從消息隊列中取得消息的客戶端應用程式。
Virtual Host
虛擬主機,表示一批交換器、消息隊列和相關對象。虛擬主機是共用相同的身份認證和加密環境的獨立伺服器域。每個 vhost 本質上就是一個 mini 版的 RabbitMQ 伺服器,擁有自己的隊列、交換器、綁定和許可權機制。vhost 是 AMQP 概念的基礎,必須在連接時指定,RabbitMQ 預設的 vhost 是 / 。
類似docker容器和容器之間是相互隔離的,一個壞了,不耽誤另一個使用
Broker
表示消息隊列伺服器實體。
總架構圖
四、JMS與AMQP比較
JMS(Java Message Service) | AMQP(Advanced Message Queuing Protocol) | |
---|---|---|
定義 | Java api | 網路線級協議 |
跨語言 | 否 | 是 |
跨平臺 | 否 | 是 |
Model | 提供兩種消息模型: (1)、Peer-2-Peer (2)、Pub/sub |
提供了五種消息模型: (1)、direct exchange (2)、fanout exchange (3)、topic change (4)、headers exchange (5)、system exchange 本質來講,後四種和JMS的pub/sub模型沒有太大差別, 僅是在路由機制上做了更詳細的劃分; |
支持消息類 型 |
多種消息類型: TextMessage MapMessage BytesMessage StreamMessage ObjectMessage Message (只有消息頭和屬性) |
byte[] 當實際應用時,有複雜的消息,可以將消息序列化後發 送。 |
實現中間件 | ActiveMQ、HornetMQ | RabbitMQ |
綜合評價 | JMS 定義了JAVA API層面的標準;在java體系中, 多個client均可以通過JMS進行交互,不需要應用修 改代碼,但是其對跨平臺的支持較差; |
AMQP定義了wire-level層的協議標準;天然具有跨平 台、跨語言特性 |
五、RabbitMQ運行機制
AMQP 中消息的路由過程和 Java 開發者熟悉的 JMS 存在一些差別,AMQP 中增加了Exchange 和Binding
的角色。生產者把消息發佈到 Exchange 上,消息最終到達隊列並被消費者接收,而 Binding 決定交換器的消息應該發送到那個隊列
。
Exchange 類型
- direct
- fanout
- topic
- headers(不建議使用)
RabbitMQ預設七大交換機
1. direct
消息中的路由鍵(routing key)如果和Binding 中的 binding key 一致, 交換器就將消息發到對應的隊列中。路由鍵與隊列名完全匹配,如果一個隊列綁定到交換機要求路由鍵為"a1.b1",則只轉發 routing key 標記為"a1.b1"的消息,不會轉發"a1.b2”,也不會轉發"a1.b3" 等等。它是完全匹配、單播的模式
。
2. fanout
每個發到 fanout 類型交換器的消息都會分到所有綁定的隊列上去。fanout 交換器不處理路由鍵
,只是簡單的將隊列綁定到交換器上,每個發送到交換器的消息都會被轉發到與該交換器綁定的所有隊列上。很像子網廣播
,每檯子網內的主機都獲得了一份複製的消息。fanout 類型轉發消息是最快的、廣播
。
3. topic
topic是升級版的fanout模式,做了選擇權,並不是全都會接受,符合條件才會收到!topic 交換器通過模式匹配分配消息的路由鍵屬性,將路由鍵和某個模式進行匹配,此時隊列需要綁定到一個模式上。它將路由鍵和綁定鍵的字元串切分成單詞,這些單詞之間用點隔開。
它同樣也會識別兩個通配符:符號#
和符號*
。#
匹配0個或多個單詞,*
匹配一個單詞。
六、Docker安裝RabbitMQ
直接輸入命令,docker會幫助我們自動去拉去鏡像的:
docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 25672:25672 \
-p 15671:15671 -p 15672:15672 rabbitmq:management
我們查詢是否運行成功
docker ps
我們在windows上進行測試是否能夠打開界面:
輸入:http://192.168.17.130:15672/
用戶名密碼都是:guest
進入界面:
七、整合Springboot
1. 引入依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--自定義消息轉化器Jackson2JsonMessageConverter所需依賴-->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
2. 主啟動類上添加註解
@EnableRabbit
@SpringBootApplication
public class GulimallOrderApplication {
public static void main(String[] args) {
SpringApplication.run(GulimallOrderApplication.class, args);
}
}
3. 編寫配置文件
# 指定rabbitmq伺服器主機
spring.rabbitmq.host=192.168.17.130
# 賬號密碼埠號都預設配置了,我們無需配置
八、測試創建交換機、隊列、綁定關係
1. 測試創建Direct交換機
@Autowired
AmqpAdmin amqpAdmin;
@Test
public void createExchange() {
// 第一個參數為交換機名字,第二個參數為是否持久化,第三個參數為不使用交換機時刪除
DirectExchange directExchange = new DirectExchange("hello-java-exchange",true,false);
amqpAdmin.declareExchange(directExchange);
System.out.println("交換機創建成功");
}
2. 打開交換機界面查看
3. 創建Queue
@Autowired
AmqpAdmin amqpAdmin;
@Test
public void createQueue() {
/**
* 第一個參數為隊列名字,
* 第二個參數為是否持久化,
* 第三個參數為是否排他(true:一個連接只能有一個隊列,false:一個連接可以有多個(推薦))
* 第四個參數為不使用隊列時自動刪除
*/
Queue queue = new Queue("hello-java-queue",true,false,false);
amqpAdmin.declareQueue(queue);
System.out.println("隊列創建成功");
}
4. 打開隊列界面查看
5. 綁定交換機和隊列
@Autowired
AmqpAdmin amqpAdmin;
@Test
public void createBinding() {
/**
* 第一個參數為目的地,就是交換機或者隊列的名字
* 第二個參數為目的地類型,交換機還是隊列
* 第三個參數為交換機,
* 第四個參數為路由鍵,匹配的名稱
*/
Binding binding = new Binding("hello-java-queue",
Binding.DestinationType.QUEUE,
"hello-java-exchange",
"hello.java",null);
amqpAdmin.declareBinding(binding);
System.out.println("綁定成功");
}
6. 打開交換機中的綁定界面
點擊交換機
綁定列表
九、測試發消息
1. 測試發送消息
@Autowired
RabbitTemplate rabbitTemplate;
@Test
public void sendMessageTest(){
// 消息類型為object 發送對象也是可以的
String msg = "這是一條消息";
// 第一個參數為發送消息到那個交換機上,第二個是發送的路由鍵(交換機進行需要符合綁定的隊列),第三個參數為發送的消息
rabbitTemplate.convertAndSend("hello-java-exchange","hello.java",msg);
System.out.println("消息發送成功");
}
2. Queue列表中查看消息
3. 手動確認消息
點擊我們的隊列:
進入詳細界面,下滑找到Get messages
:
在次點擊隊列,消息消失:
4. 測試發送對象
@Data
// 必須序列化,不然報錯
public class User implements Serializable {
private String name;
private Integer age;
}
@Autowired
RabbitTemplate rabbitTemplate;
@Test
public void sendMessageTest(){
User user = new User();
user.setAge(22);
user.setName("王振軍");
// 第一個參數為發送消息到那個交換機上,第二個是發送的路由鍵(交換機進行需要符合綁定的隊列),第三個參數為發送的消息
rabbitTemplate.convertAndSend("hello-java-exchange","hello.java",user);
System.out.println("消息發送成功");
}
5. 查看消息發送的對象
6. 書寫消息發送對象轉JSON配置類
編寫配置類
@Configuration
public class MyRabbitmqConfig {
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
}
7. 再次發送九、4中的代碼,查詢是否正常顯示對象
十、測試收消息
1. 創建接收信息的方法
方法所在的類必須交給了IOC管理,我們直接寫在service裡面。代碼如下:
@Service
public class TestService {
// queues是監聽的隊列名字,可以是多個
@RabbitListener(queues = {"hello-java-queue"})
public void reciveMessage(Object message){
System.out.println("接受的信息" + message);
}
}
2. 模擬發送消息
還是用上面的方法進行發送一個對象!
@Autowired
RabbitTemplate rabbitTemplate;
@Test
public void sendMessageTest(){
User user = new User();
user.setAge(22);
user.setName("王振軍");
// 第一個參數為發送消息到那個交換機上,第二個是發送的路由鍵(交換機進行需要符合綁定的隊列),第三個參數為發送的消息
rabbitTemplate.convertAndSend("hello-java-exchange","hello.java",user);
System.out.println("消息發送成功");
}
3. 接收消息查看
接受的信息:(Body:'{"name":"王振軍","age":22}'
MessageProperties [headers={__TypeId__=com.atguigu.gulimall.order.entity.User},
contentType=application/json, contentEncoding=UTF-8, contentLength=0,
receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false,
receivedExchange=hello-java-exchange, receivedRoutingKey=hello.java, deliveryTag=1,
consumerTag=amq.ctag-Nlg0mulsX9mxdPvGe72CBw, consumerQueue=hello-java-queue])
4. 在思考
我們發現剛剛返回的是詳細信息,我們可以指定消息的類型,就是發送消息發的對象是什麼,我們就可以直接接收就行!看代碼:
@Service
public class TestService {
@RabbitListener(queues = {"hello-java-queue"})
public void reciveMessage(Message message, User user){
System.out.println("接受的信息:" + message);
System.out.println("發送的信息:" + user);
}
}
在此發送消息,我們看一下控制台:
接受的信息:(Body:'{"name":"王振軍","age":22}'
MessageProperties [headers={__TypeId__=com.atguigu.gulimall.order.entity.User},
contentType=application/json, contentEncoding=UTF-8, contentLength=0,
receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false,
receivedExchange=hello-java-exchange, receivedRoutingKey=hello.java, deliveryTag=1,
consumerTag=amq.ctag-Nlg0mulsX9mxdPvGe72CBw, consumerQueue=hello-java-queue])
發送的信息:User(name=王振軍, age=22)
這樣就很清晰了哈!
拓展: 接收的還有第三個參數就是通道,每一個連接只會有一個通道哈!,大家可以自己測試一下,列印看看一下!!
public void reciveMessage(Message message, User user, Channel channel)
5. 多個服務監聽同一條隊列
右擊已存在服務,複製一份配置不同埠:
現在有兩個服務監聽同一個隊列!!
6. 模擬發送十條消息,查看會被那個服務接收
調整測試發消息代碼:
@Autowired
RabbitTemplate rabbitTemplate;
@Test
public void sendMessageTest(){
for (int i = 0;i < 10; i++) {
User user = new User();
user.setAge(i);
user.setName("王振軍" + i);
// 第一個參數為發送消息到那個交換機上,第二個是發送的路由鍵(交換機進行需要符合綁定的隊列),第三個參數為發送的消息
rabbitTemplate.convertAndSend("hello-java-exchange", "hello.java", user);
System.out.println("消息發送成功");
}
}
接收消息的代碼:
@Service
public class TestService {
@RabbitListener(queues = {"hello-java-queue"})
public void reciveMessage(Message message, User user){
System.out.println("接收的信息:" + user);
}
}
7. 多服務接收消息結果查看
我們看到9000服務接收了1,4,7
消息
9010服務接收了0,3,6,9
消息
總結: 我們可以發現一個消息只會被接收一次!
還有就是發了10條消息,只有7條被接收了,其餘的呢?
別急小編來告訴大家,這是因為我們測試是使用SpringBoot的測試類進行的,有的部分消息被測試的接收了!大家不信可以看一下測試的控制台,找一下:
我們看到消息的2,5,8
在這裡呢!!
拓展: 我們一次發送十條消息,每條接收消息假如耗時10s,此時會接收處理完一個消息,才會接收下一個,就是我們說的串列化!!
十一、總結
這樣我們就對RabbitMQ有了新的認識,從入門也算走上了實踐!後面有時間小編再把消息的可靠性發出來,也就是進階版!!
在次感謝雷神的課程哈,看到這裡,小伙伴們點個贊唄,小編整理不易呀!!謝謝大家了!!
有緣人才可以看得到的哦!!!