消息中間件 消息中間件是指利用高效可靠的消息傳遞機制進行平臺無關的數據交流,並且基於數據通信來進行分散式系統的集成。通過提供消息傳遞和消息排隊模型,可以在分散式架構下擴展進程之間的通信。 消息中間件能做什麼 消息中間件主要解決分散式系統之間消息的傳遞問題 ,能夠屏蔽各種平臺以及協議之間的特性,實現應 ...
消息中間件
消息中間件是指利用高效可靠的消息傳遞機制進行平臺無關的數據交流,並且基於數據通信來進行分散式系統的集成。通過提供消息傳遞和消息排隊模型,可以在分散式架構下擴展進程之間的通信。
消息中間件能做什麼
消息中間件主要解決分散式系統之間消息的傳遞問題 ,能夠屏蔽各種平臺以及協議之間的特性,實現應用之間的協同。
示例:
電商平臺中的註冊功能,用戶註冊不單是向資料庫insert,可能還需要贈送積分,發送郵件,發送簡訊等系列操作。
假如:每個操作都耗時1s,那麼註冊過程就需要耗時4s才能響應給用戶。從註冊這個服務可以看出,每個子操作都是獨立的,同時,基於領域劃分以後,它們都屬於不同的子域。所以我們可以對這些子操作實現非同步化操作。類似多線程並行處理。
如何實現非同步化?用多線程能實現嗎?多線程當然可以實現,只是,消息的持久化、消息的重發這些條件,多線程 並不能滿足.所以需要藉助一些開源的消息中間件來解決。 而分散式消息隊列就是一個很好的解決辦法。通過引入分散式隊列,大大提升程式的處理效率,並且還解決了各個模塊之間的耦合問題。
分散式消息隊列解決的場景:
引入消息中間件後(非同步處理),電商平臺中的註冊架構圖變為
電商中的秒殺:
用戶提交過來的請求,先寫入消息隊列。消息隊列是有長度的,如果消息隊列超過指定長度,直接拋棄。
秒殺的 具體核心處理業務,接收消息隊列中消息進行處理。這裡的消息處理能力取決於消費端本身的吞吐量。
解耦、非同步化、流量整形、數據的最終一致性(最大化的重試完成數據一致性)
ActiveMQ 簡介
ActiveMQ
ActiveMQ 是完全基於JMS 規範實現的一個消息中間件產品,是Apache 開源基金會研發的消息中間件。ActiveMQ 主要應用在分散式系統架構中,幫助構建高可用、高性能、可伸縮的企業級面向服務的系統。
ActiveMQ 特性
-
多語言和協議編寫客戶端
-
語言:Java、C、C++、C#、Ruby、Perl、Python、PHP
-
協議:openwire、stomp、REST、ws、notification、xmpp、AMQP
-
-
完全支持JMS1.1和J2EE1.4規範
-
對Spring的支持,ActiveMQ可以很容易的嵌入到spring模塊中
ActiveMQ 下載安裝啟動
下載地址
http://activemq.apache.org/activemq-5158-release.html
解壓
tar -zxvf apache-activemq-5.15.8-bin.tar.gz
啟動服務
-
cd apache-activemq-5.15.8/bin
sh activemq start
-
啟動並帶指定日誌文件 sh activemq start > /tmp/activemqlog
關閉服務
-
sh activemq stop
監控地址
http://192.168.15.134:8161/admin/ admin admin
ActiveMQ 的埠61616
-
預設為61616
-
檢查是否成功啟動ActiveMQ
-
netstat -an|grep 61616
-
JMS 基本概念和模型
JMS的定義
JMS(Java Message Service) :面向消息中間件的API
MOM(Message Oriented Middleware):面向消息中間件
Java 消息服務是Java平臺中關於面向消息中間件的API,用於兩個程式 之間,或者分散式系統中發送消息,進行非同步通信。
JMS 是一個與具體平臺無關的API,絕大多數MOM 提供商都對JMS提供了支持。ActiveMQ就是其中的一個實現。
MOM
MOM 是面向消息的中間件,使用消息傳送提供者來協調消息傳送操作。 MOM 需要提供API和管理工具。客戶端使用API調用,把消息發送到由提供者管理的目的地。在發送消息後,客戶端會繼續執行其他工作,並且在接收方收到這個消息確認之前,提供者一直保留該消息。
MOM 的特點
-
消息非同步接收,發送者不需要等待消息接受者響應
-
消息可靠接收,確保消息中間件可靠保存。只有接收方收到消息後才刪除消息
開源JMS提供商
JbossMQ(jboss4)、Jboss messaging(jboss5)、joram、ubermq、mantamq、openjms ...
JMS 規範
JMS 規範的目的是為了使得Java 應用程式能夠訪問現有MOM(消息中間件)系統,形成一套統一的標準規範,解決不同消息中間件之間的協作問題。
-
不同消息的傳遞域,點對點消息傳送和發佈/訂閱消息傳送
-
提供接收同步和非同步消息的工具
-
對可靠消息傳送的支持
-
常見消息格式,例如流、文本和位元組
JMS 的體繫結構
JMS 的基本功能
JMS 的基本功能是用於和麵向消息中間件相互通信的應用程式的介面
消息傳遞域
-
p2p(point-2-point) 點對點消息傳遞域
-
每個消息只能有一個消費者(離線存儲)
-
類似QQ聊天的私聊
-
-
生產者和消費者之間沒有時間上的相關性,無論消費者在生產者發送消息的時候是否處於運行狀態,都可以提取消息
-
如果session關閉時,有一些消息已經被收到,但是沒有被簽收,消費者下一次連接到相同對列時,這些消息仍然會被接收
-
如果用戶在receive 方法中設定了消息的選擇條件(消息過濾)
-
如果是持久化消息,消息會被持久化保存,直到消息被簽收
-
-
發佈訂閱(publish/subscribe)消息傳遞域
-
每個消息有多個消費者
-
類似QQ群聊
-
-
生產者和消費者有時間上的相關性
-
訂閱一個主題的消費者只能消費自它訂閱之後發佈的消息。
-
JMS 規範允許客戶創建持久訂閱,一定程度上降低了時間的相關性要求
-
持久訂閱允許消費者消費它在未處於激活狀態時發送的消息
-
-
持久化訂閱和非持久化訂閱
-
在非持久化訂閱的前提下,不能恢復或者重新指派一個未簽收的消息;
-
如果所有消息必須要簽收,則使用持久訂閱
-
消息的組成
消息頭(Header)
消息頭包含消息的識別信息和路由信息
消息頭包含一些標準的屬性:
-
JMSDestination
-
消息發送的目的地,queue或者topic
-
-
JMSDeliveryMode
-
傳送模式,持久化模式和非持久模式
-
-
JMSPrority
-
消息優先順序(優先順序分為10個級別,從0最低-9最高)
-
如果不設定優先順序,預設級別4,需要註意的是,JMS Provider 並不一定保證按照優先順序的順序提交
-
-
JMSMessageID
-
唯一識別每個消息的標識
-
消息體
就是我們需要傳遞的消息的內容
JMS API定義了5種消息體格式:
-
TextMessage
-
java.lang.String 對象,如xml文件內容
-
-
MapMessage
-
名/值對的集合,名是String 對象,值可以是Java 任何基本類型
-
-
BytesMessage
-
位元組流
-
-
StreamMessage
-
Java 中的輸入輸出流
-
-
ObjectMessage
-
Java 中的可序列化對象
-
-
Message
-
沒有消息體,只有消息頭和屬性
-
消息的屬性
按類型分為:
-
應用設置的屬性
-
Message.setStringProperty(key,value);
-
-
標準屬性
-
使用“JMSX” 作為屬性名的首碼
-
-
消息中間件定義的屬性
-
JMS Provider 特定的屬性
-
JMS 的可靠機制
消息的確認方式
消息的處理階段:
-
客戶端接收消息
-
客戶端處理消息
-
消息被確認
會話存在兩種機制:
-
事務性會話
-
createSession(boolean transacted, int acknowledgeMode)
-
Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
-
-
session.commit() //消息被確認 事務提交意味著生產的所有消息被髮送,消費的所有消息被確認
-
session.rollback(); //重新處理 消息沒有被提交,沒有被處理,消費端的所有消息被恢復,並且重新被提交, 表示一個事務結束, 另一個事務會開始。事務回滾意味著生產的所有消息被銷毀,消費的所有消息 被恢復並重新提交,除非它們已經過期
-
通過session.commit() //完成事務的簽收
-
-
非事務性會話
-
transacted 設置為FALSE
-
Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
-
客戶端簽收模型
-
Session session = connection.createSession(Boolean.FALSE, Session.CLIENT_ACKNOWLEDGE);
-
那麼需要手動簽收
-
textMessage.acknowledge();
-
客戶端延遲確認,消息可能重覆消費
-
Session session = connection.createSession(Boolean.FALSE, DUPS_OK_ACKNOWLEDGE);
-
-
事務性的自動確認
非事務性的自動確認和手動確認
消息的持久化存儲
持久化(存儲在資料庫或磁碟)
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
對於持久消息,消息提供者會使用存儲-轉發機制,先將消息存儲到穩定的介質中,等消息發送成功後再刪除。如果JMS Provider 宕機,那麼這些未送達的消息則不會丟失,JMS Provider 恢復正常後,會重新讀取這些消息,並傳送給對應的消費者。
非持久化(存儲在記憶體中)
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
對於非持久化消息,JMS Provider 不會將它存到文件、資料庫等穩定介質中。也就是說非持久消息,存儲在記憶體中,如果JMS Provider 宕機,那麼非持久化消息會丟失。
持久訂閱
-
持久訂閱者和非持久訂閱者針對的Domain 是Pub/Sub,而不是P2P
-
當Broker 發送消息給訂閱者時,如果訂閱者處於未激活狀態,持久訂閱者可以收到消息,而非持久訂閱者則收不到消息。
-
當持久訂閱者處於未激活狀態時,Broker 需要為持久訂閱者保存消息,如果持久訂閱者訂閱的消息太多則會溢出。
-
持久訂閱時,客戶端向JMS 伺服器註冊一個自己身份的ID, 當這個客戶端處於離線時,JMS Provider 會為這個ID 保存所有發送到主題的消息,當客戶再次連接到 JMS Provider時,會根據自己的ID得到所有當自己處於離線時發送到主題的消息。
-
持久訂閱的方式(消費端)
-
connection.setClientID("test");
-
Topic destination=session.createTopic("myTopic");
-
MessageConsumer consumer=session.createDurableSubscriber(destination,"test");
案例架構圖
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.15.8</version> </dependency>
生產端
import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class JMSQueueProducer { public static void main(String args[]) { ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.15.134:61616"); Connection connection = null; try { connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); //創建目的地 Destination destination = session.createQueue("myQueue"); //創建發送者 MessageProducer producer = session.createProducer(destination); //持久化 producer.setDeliveryMode(DeliveryMode.PERSISTENT); TextMessage textMessage = session.createTextMessage("Hello,World"); producer.send(textMessage); session.commit(); session.close(); } catch (JMSException e) { e.printStackTrace(); } finally { if (connection != null) { try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } } }
消費端
import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; import javax.xml.soap.Text; public class JMSQueueConsumer { public static void main(String args[]) { ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.15.134:61616"); Connection connection = null; try { connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE); //創建目的地 Destination destination = session.createQueue("myQueue"); //創建接收者 MessageConsumer consumer = session.createConsumer(destination); //接收消息 阻塞方式監聽消息 TextMessage textMessage =(TextMessage) consumer.receive(); System.out.println(textMessage.getText()); session.commit(); //表示消息被自動確認 session.close(); } catch (JMSException e) { e.printStackTrace(); }finally { if(connection!=null) { try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } } }