[TOC] RabbitMQ 特點 RabbitMQ 相較於其他消息隊列,有一系列防止消息丟失的措施,擁有強悍的高可用性能,它的吞吐量可能沒有其他消息隊列大,但是其消息的保障性出類拔萃,被廣泛用於金融類業務。與其他消息隊列的比較以及強大的防止消息丟失的能力我們將在後續文章再做介紹。 AMQP 協議 ...
目錄
RabbitMQ 特點
RabbitMQ 相較於其他消息隊列,有一系列防止消息丟失的措施,擁有強悍的高可用性能,它的吞吐量可能沒有其他消息隊列大,但是其消息的保障性出類拔萃,被廣泛用於金融類業務。與其他消息隊列的比較以及強大的防止消息丟失的能力我們將在後續文章再做介紹。
AMQP 協議
AMQP: Advanced Message Queuing Protocol 高級消息隊列協議
AMQP定義:是具有現代特征的二進位協議。是一個提供統一消息服務的應用層標準高級消息隊列協議,是應用層協議的一個開放標準,為面向消息的中間件設計。
Erlang語言最初在於交換機領域的架構模式,這樣使得RabbitMQ在Broker之間進行數據交互的性能是非常優秀的
Erlang的優點: Erlang有著和原生Socket一樣的延遲。
RabbitMQ是一個開源的消息代理和隊列伺服器,用來通過普通協議在完全不同的應用之間共用數據, RabbitMQ是使用Erlang語言來編寫的,並且RabbitMQ是基於AMQP協議的。
RabbitMQ 消息傳遞機制
生產者發送消息到指定的 Exchange,Exchange 依據自身的類型(direct、topic等),根據 routing key 將消息發送給 0 - n 個 隊列,隊列再將消息轉發給了消費者。
Server: 又稱Broker, 接受客戶端的連接,實現AMQP實體服務,這裡指RabbitMQ 伺服器
Connection: 連接,應用程式與Broker的網路連接。
Channel: 網路通道,幾乎所有的操作都在 Channel 中進行,Channel是進行消息讀寫的通道。客戶端可建立多個Channel:,每個Channel代表一個會話任務。
Virtual host: 虛似地址,用於迸行邏輯隔離,是最上層的消息路由。一個 Virtual Host 裡面可以有若幹個 Exchange和 Queue ,同一個 VirtualHost 裡面不能有相同名稱的 Exchange 或 Queue。許可權控制的最小粒度是Virtual Host。
Binding: Exchange 和 Queue 之間的虛擬連接,binding 中可以包含 routing key。
Routing key: 一 個路由規則,虛擬機可用它來確定如何路由一個特定消息,即交換機綁定到 Queue 的鍵。
Queue: 也稱為Message Queue,消息隊列,保存消息並將它們轉發給消費者。
Message
消息,伺服器和應用程式之間傳送的數據,由 Properties 和 Body 組成。Properties 可以對消息進行修飾,比如消息的優先順序、延遲等高級特性;,Body 則就 是消息體內容。
properties 中我們可以設置消息過期時間以及是否持久化等,也可以傳入自定義的map屬性,這些在消費端也都可以獲取到。
生產者
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.util.HashMap;
import java.util.Map;
public class MessageProducer {
public static void main(String[] args) throws Exception {
//1. 創建一個 ConnectionFactory 併進行設置
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
//2. 通過連接工廠來創建連接
Connection connection = factory.newConnection();
//3. 通過 Connection 來創建 Channel
Channel channel = connection.createChannel();
//4. 聲明 使用預設交換機 以隊列名作為 routing key
String queueName = "msg_queue";
/**
* deliverMode 設置為 2 的時候代表持久化消息
* expiration 意思是設置消息的有效期,超過10秒沒有被消費者接收後會被自動刪除
* headers 自定義的一些屬性
* */
//5. 發送
Map<String, Object> headers = new HashMap<String, Object>();
headers.put("myhead1", "111");
headers.put("myhead2", "222");
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
.deliveryMode(2)
.contentEncoding("UTF-8")
.expiration("100000")
.headers(headers)
.build();
String msg = "test message";
channel.basicPublish("", queueName, properties, msg.getBytes());
System.out.println("Send message : " + msg);
//6. 關閉連接
channel.close();
connection.close();
}
}
消費者
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.Map;
public class MessageConsumer {
public static void main(String[] args) throws Exception{
//1. 創建一個 ConnectionFactory 併進行設置
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(3000);
//2. 通過連接工廠來創建連接
Connection connection = factory.newConnection();
//3. 通過 Connection 來創建 Channel
Channel channel = connection.createChannel();
//4. 聲明
String queueName = "msg_queue";
channel.queueDeclare(queueName, false, false, false, null);
//5. 創建消費者並接收消息
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
Map<String, Object> headers = properties.getHeaders();
System.out.println("head: " + headers.get("myhead1"));
System.out.println(" [x] Received '" + message + "'");
System.out.println("expiration : "+ properties.getExpiration());
}
};
//6. 設置 Channel 消費者綁定隊列
channel.basicConsume(queueName, true, consumer);
}
}
Send message : test message
head: 111
[x] Received 'test message'
100000
Exchange
1. 簡介
Exchange 就是交換機,接收消息,根據路由鍵轉發消息到綁定的隊列。有很多的 Message 進入到 Exchange 中,Exchange 根據 Routing key 將 Message 分發到不同的 Queue 中。
2. 類型
RabbitMQ 中的 Exchange 有多種類型,類型不同,Message 的分發機制不同,如下:
- fanout:廣播模式。這種類型的 Exchange 會將 Message 分發到綁定到該 Exchange 的所有 Queue。
direct:這種類型的 Exchange 會根據 Routing key(精確匹配,將Message分發到指定的Queue。
- Topic:這種類型的 Exchange 會根據 Routing key(模糊匹配,將Message分發到指定的Queue。
headers: 主題交換機有點相似,但是不同於主題交換機的路由是基於路由鍵,頭交換機的路由值基於消息的header數據。 主題交換機路由鍵只有是字元串,而頭交換機可以是整型和哈希值 .
3. 屬性
/**
* Declare an exchange, via an interface that allows the complete set of
* arguments.
* @see com.rabbitmq.client.AMQP.Exchange.Declare
* @see com.rabbitmq.client.AMQP.Exchange.DeclareOk
* @param exchange the name of the exchange
* @param type the exchange type
* @param durable true if we are declaring a durable exchange (the exchange will survive a server restart)
* @param autoDelete true if the server should delete the exchange when it is no longer in use
* @param internal true if the exchange is internal, i.e. can't be directly
* published to by a client.
* @param arguments other properties (construction arguments) for the exchange
* @return a declaration-confirm method to indicate the exchange was successfully declared
* @throws java.io.IOException if an error is encountered
*/
Exchange.DeclareOk exchangeDeclare(String exchange,
String type,boolean durable,
boolean autoDelete,boolean internal,
Map<String, Object> arguments) throws IOException;
- Name: 交換機名稱
- Type: 交換機類型direct、topic、 fanout、 headers
- Durability: 是否需要持久化,true為持久化
- Auto Delete: 當最後一個綁定到Exchange. 上的隊列刪除後,自動刪除該Exchange
- Internal: 當前Exchange是否用於RabbitMQ內部使用,預設為False
- Arguments: 擴展參數,用於擴展AMQP協議自製定化使用