本文適合JAVA新人,想瞭解RabbitMQ又不想去看官網文檔的人(英語水看的頭疼(◎﹏◎),但建議有能力還是去看官網文檔)。 消息隊列MQ(一) MQ全稱為Message Queue,消息隊列是應用程式和應用程式之間的通信方法。 先引入一下常見的通訊方案。 為什麼使用MQ? 在項目中,可將一些無需 ...
本文適合JAVA新人,想瞭解RabbitMQ又不想去看官網文檔的人(英語水看的頭疼(◎﹏◎),但建議有能力還是去看官網文檔)。
消息隊列MQ(一)
MQ全稱為Message Queue,消息隊列是應用程式和應用程式之間的通信方法。
先引入一下常見的通訊方案。
為什麼使用MQ?
在項目中,可將一些無需即時返回且耗時的操作提取出來,進行非同步處理,而這種非同步處理的方式大大的節省伺服器的請求響應時間,從而提高了系統的吞吐量。
開發中消息隊列通常有如下應用場景:
應用解耦、非同步處理(提高系統響應速度)、流量削峰(高峰堆積消息,峰後繼續處理消息)、日誌處理(分散式日誌,一般使用kafka)、純粹通訊。
AMQP 和 JMS
MQ是消息通信的模型;實現MQ的大致有兩種主流方式:AMQP、JMS。
AMQP
AMQP高級消息隊列協議,是一個進程間傳遞非同步消息的網路協議,更準確的說是一種binary wire-level protocol(鏈接協議)。這是其和JMS的本質差別,AMQP不從API層進行限定,而是直接定義網路交換的數據格式。
JMS
JMS即Java消息服務(JavaMessage Service)應用程式介面,是一個Java平臺中關於面向消息中間件(MOM)的API,用於在兩個應用程式之間,或分散式系統中發送消息,進行非同步通信。
AMQP 與 JMS 區別
JMS是定義了統一的介面,來對消息操作進行統一;AMQP是通過規定協議來統一數據交互的格式
JMS限定了必須使用Java語言;AMQP只是協議,不規定實現方式,因此是跨語言的。
JMS規定了兩種消息模式;而AMQP的消息模式更加豐富。
消息隊列產品:目前市面上成熟主流的MQ有Kafka 、RocketMQ、RabbitMQ,本文主要介紹RabbitMQ使用。
使用Erlang(語言)編寫的一個開源的消息隊列,本身支持很多的協議:AMQP,XMPP, SMTP,STOMP,也正是如此,使的它變的非常重量級,更適合於企業級的開發。同時實現了Broker架構,核心思想是生產者不會將消息直接發送給隊列,消息在發送給客戶端時先在中心隊列排隊。對路由(Routing),負載均衡(Load balance)、數據持久化都有很好的支持。多用於進行企業級的ESB整合。
RabbitMQ介紹
RabbitMQ是由erlang語言開發,基於AMQP(Advanced Message Queue 高級消息隊列協議)協議實現的消息隊列,它是一種應用程式之間的通信方法,消息隊列在分散式系統開發中應用非常廣泛。
RabbitMQ官方地址:http://www.rabbitmq.com/
RabbitMQ提供了6種模式:簡單模式,work工作隊列(集群)模式,Publish/Subscribe發佈與訂閱(交換機的廣播)模式,Routing(交換機的定向)路由模式,Topics主題(路由靈活)模式,RPC遠程調用模式(遠程調用,不太算MQ;不作介紹);//括弧內的是自己的理解方式僅供參考。詳細可以去看官方介紹。
官網對應模式介紹:https://www.rabbitmq.com/getstarted.html
安裝RabbirMQ
兩種方式:windows環境與Linux環境(這裡跳過)
我是LinuxCenOS6.7安裝的3.6.10版本
啟動成功參考如下兩張圖
先在WEB頁面管理用戶
角色說明: Tags
1、超級管理員(administrator)
可登陸管理控制台,可查看所有的信息,並且可以對用戶,策略(policy)進行操作。
2、監控者(monitoring)
可登陸管理控制台,同時可以查看rabbitmq節點的相關信息(進程數,記憶體使用情況,磁碟使用情況等)
3、策略制定者(policymaker) :可登陸管理控制台, 同時可以對policy進行管理。但無法查看節點的相關信息(上圖紅框標識的部分)。
4、普通管理者(management):僅可登陸管理控制台,無法看到節點信息,也無法對策略進行管理。
5、其他 : 無法登陸管理控制台,通常就是普通的生產者和消費者。
Virtual Hosts配置
在RabbitMQ中可以虛擬消息伺服器Virtual Host,每個Virtual Hosts相當於一個相對獨立的RabbitMQ伺服器,每個VirtualHost之間是相互隔離的。exchange、queue、message不能互通。 相當於mysql的db。Virtual Name一般以/開頭。
添加隊列,這裡需要將上下兩張圖結合起來看
需改用戶的密碼
查看預設的交換機
常見的埠
RabbitMQ入門
目標:入門案例將使用RabbitMQ的簡單模式實現通訊過程。
1.創建Maven工程,先在pom.xml添加依賴。
1 <?xml version="1.0" encoding="UTF-8"?> 2 <project xmlns="http://maven.apache.org/POM/4.0.0" 3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 4 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 5 <modelVersion>4.0.0</modelVersion> 6 <groupId>com.jxjdemo</groupId> 7 <artifactId>rabbitmq1_demo</artifactId> 8 <version>1.0-SNAPSHOT</version> 9 10 <dependencies> 11 <dependency> <!--rabbitmq的依賴--> 12 <groupId>com.rabbitmq</groupId> 13 <artifactId>amqp-client</artifactId> 14 <version>5.6.0</version> 15 </dependency> 16 </dependencies> 17 </project>
2.新建生產者類,生產發送消息
1 package com.jxjdemo.mq.simple; 2 3 import com.rabbitmq.client.Channel; 4 import com.rabbitmq.client.Connection; 5 import com.rabbitmq.client.ConnectionFactory; 6 7 public class SimpleProducer { 8 public static void main(String args[]) throws Exception{ 9 //1、創建鏈接工廠對象-factory=newConnectionFactory()。創建鏈接用 10 ConnectionFactory factory = new ConnectionFactory(); 11 12 //2、設置RabbitMQ服務主機地址,預設localhost-factory.setHost("localhost") 13 factory.setHost("192.168.211.128"); 14 //3、設置RabbitMQ服務埠,預設-1-factory.setPort(5672) 15 factory.setPort(5672); 16 //4、設置虛擬主機名字,預設/-factory.setVirtualHost("szitheima") 17 factory.setVirtualHost("shujuku1122"); 18 //5、設置用戶連接名,預設guest-factory.setUsername("admin") 19 factory.setUsername("admin"); 20 //6、設置鏈接密碼,預設guest-factory.setPassword("admin") 21 factory.setPassword("123456"); 22 // factory.setConnectionTimeout(5000); 23 // factory.setWorkPoolTimeout(5000); 24 // factory.setHandshakeTimeout(5000); 25 //7、創建鏈接-connection=factory.newConnection() 26 Connection connection = factory.newConnection(); //報錯,拋異常 27 //8、創建頻道-channel=connection.createChannel() 28 Channel channel = connection.createChannel(); 29 //9、聲明隊列-channel.queueDeclare(名稱,是否持久化(true先存硬碟,讀完再刪),是否獨占本連接,是否自動刪除(false讀完再刪),附加參數) 30 channel.queueDeclare("simplequeue", true, false, false, null); 31 //10、創建消息-Stringm=xxx 32 String msg = "這是我們第一次發送 MQ消息"; 33 //11、消息發送-channel.basicPublish(交換機[預設DefaultExchage],路由key[簡單模式可以傳遞隊列名稱],消息其它屬性,消息內容) 34 channel.basicPublish("", "simplequeue", null, msg.getBytes("utf-8")); 35 //12、關閉資源-channel.close();connection.close() 36 channel.close(); 37 connection.close(); 38 } 39 }
執行後發個消息,沒看到異常。
擴展:這裡遇到的異常有,時間超時
解決方法一:
發送不成功報錯,就先重啟MQ,在重啟【管理員的方式啟動】IDE,一般都是MQ的問題。
發送消息為空,消息不能有空格。註意庫名字。
解決方法二:
我們安裝系統會給系統起個名字導致:修改後的主機名並沒有在linux系統的hosts文件中,因此解析的時候,無法直接從該文件中獲取,需要多重解析,才能解析該主機名。
不同的linux版本,這個配置文件也可能不同vim /etc/hosts
繼續說發送成功的事情。
3.創建消費者,接收消息。
1 package com.jxjdemo.mq.simple;
2 3 import com.rabbitmq.client.*; 4 5 import javax.security.auth.callback.Callback; 6 import java.io.IOException; 7 import java.util.concurrent.TimeoutException; 8
//這裡刪除了文檔註釋
16 public class SimpleConsumer { 17 public static void main(String args[]) throws IOException, TimeoutException { 18 //1、創建鏈接工廠對象-factory=newConnectionFactory() 19 ConnectionFactory factory = new ConnectionFactory(); 20 //2、設置RabbitMQ服務主機地址,預設localhost-factory.setHost("localhost") 21 factory.setHost("192.168.211.128"); 22 //3、設置RabbitMQ服務埠,預設-1-factory.setPort(5672) 23 factory.setPort(5672); 24 //4、設置虛擬主機名字,預設/-factory.setVirtualHost("szitheima") 25 factory.setVirtualHost("shujuku1122"); 26 //5、設置用戶連接名,預設guest-factory.setUsername("admin") 27 factory.setUsername("admin"); 28 //6、設置鏈接密碼,預設guest-factory.setPassword("admin") 29 factory.setPassword("123456"); 30 //7、創建鏈接-connection=factory.newConnection() 31 Connection connection = factory.newConnection(); 32 //8、創建頻道-channel=connection.createChannel() 33 Channel channel = connection.createChannel(); 34 //9、聲明隊列-channel.queueDeclare(名稱,是否持久化,是否獨占本連接,是否自動刪除,附加參數) 35 channel.queueDeclare("simplequeue",true ,false , false,null ); 36 //10接收消息 37 Consumer callback = new DefaultConsumer(channel){ 38 /** 39 * @param consumerTag 消費者標簽,在channel.basicConsume時候可以指定 40 * @param envelope 信封,消息包的內容,可從中獲取消息id,消息routingkey,交換機,消息和重傳標誌(收到消息失敗後是否需要重新發送) 41 * @param properties 屬性信息(生產者的發送時指定) 42 * @param body 消息內容 43 * @throws IOException 44 */ 45 @Override 46 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { 47 Long deliveryTag = envelope.getDeliveryTag(); //消息ID 48 String exchange = envelope.getExchange(); 49 String routingKey = envelope.getRoutingKey(); //路由KEY 50 //消息內容 51 String msg = new String(body,"utf-8"); 52 System.out.println( 53 "routingKey:" + routingKey + 54 "routingKey:" + routingKey + 55 ",exchange:" + exchange + 56 ",deliveryTag:" + deliveryTag + 57 ",message:" + msg); 58 } 59 }; 60 channel.basicConsume("simplequeue", callback); 61 //不關閉,繼續接受消息 62 } 63 }
執行後看到一下結果
當你的代碼運行到這裡,那麼恭喜你入門成功。
這次暫時先到這裡結束。欲知其他4種模式且看下回慢慢分解。