聲明 轉載請註明出處! Reprint please indicate the source! http://www.hiknowledge.top/?p=86&preview=true 什麼是JMS JMS即Java消息服務(Java Message Service)應用程式介面,是一個Java平 ...
聲明 轉載請註明出處! Reprint please indicate the source!
http://www.hiknowledge.top/?p=86&preview=true
MessageQueue是分散式的系統里經常要用到的組件。
什麼是JMS
JMS即Java消息服務(Java Message Service)應用程式介面,是一個Java平臺中關於面向消息中間件(MOM)的API,用於在兩個應用程式之間,或分散式系統中發送消息,進行非同步通信。Java消息服務是一個與具體平臺無關的API,絕大多數MOM提供商都對JMS提供支持。
(引用自百度百科)
JMS 在其中扮演的角色與JDBC 很相似,正如JDBC 提供了一套用於訪問各種不同關係資料庫的公共API,JMS 也提供了獨立於特定廠商的企業消息系統訪問方式。JMS 的編程過程很簡單,概括為:應用程式A 發送一條消息到消息伺服器(也就是JMS Provider)的某個目得地(Destination),然後消息伺服器把消息轉發給應用程式B。因為應用程式A 和應用程式B 沒有直接的代碼關連,所以兩者實現瞭解偶。
(引用自博客)
JMS的用途
- 解耦
- 數據的可靠傳輸
- 保證數據不重發,不丟失
- 能夠實現跨平臺操作,能夠為不同操作系統上的軟體集成數據傳送服務。
消息的傳遞模型
JMS支持兩種消息傳遞模型:
點對點(point-to-point,簡稱PTP)和發佈/訂閱(publish/subscribe,簡稱pub/sub)。這兩種消息傳遞模型非常相似,但有以下區別:
- a. PTP消息傳遞模型規定了一條消息之恩能夠傳遞費一個接收方。
- b. Pub/sub消息傳遞模型允許一條消息傳遞給多個接收方 每個模型都通過擴展公用基類來實現。例如:javax.jms.Queue和Javax.jms.Topic都擴展自javax.jms.Destination類。
上面兩種消息傳遞模型里,我們都需要定義消息生產者和消費者,生產者吧消息發送到JMS Provider的某個目標地址(Destination),消息從該目標地址傳送至消費者。消費者可以同步或非同步接收消息,一般而言,非同步消息消費者的執行和伸縮性都優於同步消息接收者,體現在:
- 非同步消息接收者創建的網路流量比較小。單向對東消息,並使之通過管道進入消息監聽器。管道操作支持將多條消息聚合為一個網路調用。
- 非同步消息接收者使用線程比較少。非同步消息接收者在不活動期間不使用線程。同步消息接收者在接收調用期間內使用線程,結果線程可能會長時間保持空閑,尤其是如果該調用中指定了阻塞超時。
- 對於伺服器上運行的應用程式代碼,使用非同步消息接收者幾乎總是最佳選擇,尤其是通過消息驅動Bean。使用非同步消息接收者可以防止應用程式代碼在伺服器上執行阻塞操作。而阻塞操作會是伺服器端線程空閑,甚至會導致死鎖。阻塞操作使用所有線程時則發生死鎖。如果沒有空餘的線程可以處理阻塞操作自身解鎖所需的操作,這該操作永遠無法停止阻塞。
(引用自博客)
什麼是ActiveMQ
ActiveMQ 是Apache出品,最流行的,能力強勁的開源消息匯流排。ActiveMQ 是一個完全支持JMS1.1和J2EE 1.4規範的 JMS Provider實現,儘管JMS規範出台已經是很久的事情了,但是JMS在當今的J2EE應用中間仍然扮演著特殊的地位。
(引用自百度百科)
權威書籍
環境配置
Maven依賴
引入核心包
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core</artifactId>
<version>5.7.0</version>
</dependency>
TIPS:如果你引入的是下麵的activemq-all.jar,且工程中已經引入了SLF4J,會與activemq-all.jar中的SLF4J發生衝突。
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.14.4</version>
</dependency>
啟動broker
要運行起來demo得先,啟動broker。我是在虛擬機上測試的。ip:192.168.235.100
進入到 執行:
activemq start
測試
訪問頁面 http://192.168.235.100:8161/
預設用戶名/密碼:admin/admin
介面介紹
由於歷史原因,JMS提供有四套API介面。
- JMS1.0 定義了兩個域相關的API,queue、topic。
- JMS1.1 引入了一組新API,也叫做傳統API。
- JMS2.0 引入了一組簡化API,擁有傳統API所有的特性,同時介面更少、使用更方便。
每組API挺一組不同的介面集合,擁有連接到JMS提供者、發送和接收消息。它們共用一組代表消息、消息目的地和其他各方面功能特性的通用介面。所有的介面都在javax.jms下。
傳統API介面
傳統API介面模型圖
Destination 介面是Queue和Topic的頂層介面。
介面 | 用處 |
---|---|
ConnectionFactory | 用戶用來創建到JMS提供者的連接的被管對象。JMS客戶通過可移植的介面訪問連接,這樣當下層的實現改變時,代碼不需要進行修改。 管理員在JNDI名字空間中配置連接工廠,這樣,JMS客戶才能夠查找到它們。根據消息類型的不同,用戶將使用隊列連接工廠,或者主題連接工廠。 |
Connection | 連接代表了應用程式和消息伺服器之間的通信鏈路。在獲得了連接工廠後,就可以創建一個與JMS提供者的連接。根據不同的連接類型,連接允許用戶創建會話,以發送和接收隊列和主題到目標。 |
Session | 表示一個單線程的上下文,用於發送和接收消息。由於會話是單線程的,所以消息是連續的,就是說消息是按照發送的順序一個一個接收的。會話的好處是它支持事務。如果用戶選擇了事務支持,會話上下文將保存一組消息,直到事務被提交才發送這些消息。在提交事務之前,用戶可以使用回滾操作取消這些消息。一個會話允許用戶創建消息生產者來發送消息,創建消息消費者來接收消息。 |
Destination | 目標是一個包裝了消息目標標識符的被管對象,消息目標是指消息發佈和接收的地點,或者是隊列,或者是主題。JMS管理員創建這些對象,然後用戶通過JNDI發現它們。和連接工廠一樣,管理員可以創建兩種類型的目標,點對點模型的隊列,以及發佈者/訂閱者模型的主題。 |
MessageConsumer | 由會話創建的對象,用於接收發送到目標的消息。消費者可以同步地(阻塞模式),或非同步(非阻塞)接收隊列和主題類型的消息。 |
MessageProducer | 由會話創建的對象,用於發送消息到目標。用戶可以創建某個目標的發送者,也可以創建一個通用的發送者,在發送消息時指定目標。 |
Message | 是在消費者和生產者之間傳送的對象,也就是說從一個應用程式創送到另一個應用程式。一個消息有三個主要部分:消息頭(必須):包含用於識別和為消息尋找路由的操作設置。一組消息屬性(可選):包含額外的屬性,支持其他提供者和用戶的相容。可以創建定製的欄位和過濾器(消息選擇器)。 |
一個消息體(可選):允許用戶創建五種類型的消息(文本消息,映射消息,位元組消息,流消息和對象消息)。消息介面非常靈活,並提供了許多方式來定製消息的內容。
簡化API接
簡化API介面模型圖
demo註意:ActiveMQ是沒有實現簡化版介面的。不僅ActiveMQ,很多廠商也沒有支持簡化版API介面。
點對點模式
點對點模式,有點類似關係資料庫。從編程角度,它裡面的Acknowledge,就類似於資料庫的commit。Connection連接、Session會話、工廠模式等,在設計上與資料庫很像。
ActiveMQ中Queue實現了點對點模型。
JMSProducer.java
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* Created with IntelliJ IDEA.<br>
* Description: JMS ActiveMQ Demo測試 消息生產者<br>
* 運行前,需要打開本地的activemq。
* 如果需要更改broker地址,要提前運行相應的broker。
* User: jahen<br>
* Date: 2017-04-02<br>
* Time: 11:06<br>
*/
public class JMSProducer {
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; // 預設連接
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; // 預設密碼
// private static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL; // 預設連接地址 為 failover://tcp://localhost:61616
private static final String BROKER_URL = "failover://tcp://192.168.235.100:61616"; // 指定連接地址 (my VM)
private static final int SENDNUM = 10; // 發送的消息數量
public static void main(String[] args) {
ConnectionFactory connectionFactory; // 連接工程,生產Connection
Connection connection = null; // 連接
Session session; // 會話 接受或者發送消息的線程
Destination destination; // 消息的目的地
MessageProducer messageProducer; // 消息生產者
// 實例化連接工廠
connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL);
// 創建連接
try {
connection = connectionFactory.createConnection();
connection.start(); // 啟動連接
session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); // 提交事務,自動確認
destination = session.createQueue("FirstQueue"); // 創建消息隊列
messageProducer = session.createProducer(destination); // 創建消息發送者
sendMessage(session, messageProducer); // 發送消息
session.commit(); // 提交事務
} catch (JMSException e) {
e.printStackTrace();
} finally {
if (connection!=null)
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
/**
* 發送消息
* @param session 會話
* @param messageProducer 消息生產者
*/
private static void sendMessage(Session session, MessageProducer messageProducer) throws JMSException {
for (int i=0; i<JMSProducer.SENDNUM; i++) {
TextMessage message = session.createTextMessage("ActiveMQ 發送的消息 "+i);
System.out.println("發送消息: ActiveMQ 發送的消息 "+i);
messageProducer.send(message);
}
}
}
運行一下JMSProudcer,生產10條消息。
JMSConsumer.java
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQMessageConsumer;
import javax.jms.*;
/**
* Created with IntelliJ IDEA.<br>
* Description: 消息消費者1-點對點模式<br>
* 實現方式1 迴圈檢測<br>
* User: jahen<br>
* Date: 2017-04-02<br>
* Time: 13:44<br>
*/
public class JMSConsumer {
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; // 預設連接
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; // 預設密碼
// private static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL; // 預設連接地址 為 failover://tcp://localhost:61616
private static final String BROKER_URL = "failover://tcp://192.168.235.100:61616"; // 指定連接地址 (my VM)
public static void main(String args[]) {
ConnectionFactory connectionFactory; // 連接工程,生產Connection
Connection connection = null; // 連接
Session session; // 會話 接受或者發送消息的線程
Destination destination; // 消息的目的地
MessageConsumer messageConsumer; // 消息消費者
// 實例化連接工廠
connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL);
try {
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 消費消息不需要事務,自動確認
destination = session.createQueue("FirstQueue"); // 創建消息隊列
messageConsumer = session.createConsumer(destination); // 創建消息消費者
while (true) {
TextMessage textMessage = (TextMessage) messageConsumer.receive(100000);// 設置延時為100s
if (textMessage!=null) { // 接收到消息
System.out.println("接收的消息:"+textMessage.getText());
}else {
break;
}
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}
運行一下JMSConsumer,消費10條消息。
這種方式消費消息,通過迴圈檢查,顯然是不高明的。
下麵,通過設置監聽的方式,實現消息消費。
再次運行一下JMSProudcer,生產10條消息。
首先實現一下監聽器
Listenr.java
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
/**
* Created with IntelliJ IDEA.<br>
* Description: 消息監聽者<br>
* User: jahen<br>
* Date: 2017-04-02<br>
* Time: 14:30<br>
*/
public class Listener implements MessageListener {
@Override
public void onMessage(Message message) {
try {
System.out.println("收到消息:" + ((TextMessage)message).getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
JMSConsumer2.java
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* Created with IntelliJ IDEA.<br>
* Description: 消息消費者2-點對點模式<br>
* 實現方式2 設置監聽<br>
* User: jahen<br>
* Date: 2017-04-02<br>
* Time: 13:44<br>
*/
public class JMSConsumer2 {
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; // 預設連接
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; // 預設密碼
// private static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL; // 預設連接地址 為 failover://tcp://localhost:61616
private static final String BROKER_URL = "failover://tcp://192.168.235.100:61616"; // 指定連接地址 (my VM)
public static void main(String args[]) {
ConnectionFactory connectionFactory; // 連接工程,生產Connection
Connection connection = null; // 連接
Session session; // 會話 接受或者發送消息的線程
Destination destination; // 消息的目的地
MessageConsumer messageConsumer; // 消息消費者
// 實例化連接工廠
connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL);
try {
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 消費消息不需要事務,自動確認
destination = session.createQueue("FirstQueue"); // 創建消息隊列
messageConsumer = session.createConsumer(destination); // 創建消息消費者
messageConsumer.setMessageListener(new Listener());// 註冊消息監聽
} catch (JMSException e) {
e.printStackTrace();
}
}
}
運行一下JMSConsumer2,新產生的消息被消費了。
消息發佈/訂閱模式
發佈/訂閱模式是一對多的關係。
註意:發佈/訂閱要先運行訂閱,再運行發佈才能收到消息。
發佈者和訂閱者之間有時間上的依賴性。針對某個主題Topic的訂閱者,它必須創建一個訂閱者之後,才能消費發佈者的消息,而且為了消費消息,訂閱者必須保持運行的狀態。
Topic 實現了發佈/訂閱模型。
JMSConsumer.java
package com.jahentao.activemq;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* Created with IntelliJ IDEA.<br>
* Description: 消息消費者-發佈訂閱模式 消息訂閱者<br>
* 實現方式 設置監聽<br>
* 消息訂閱者1<br>
* User: jahen<br>
* Date: 2017-04-02<br>
* Time: 13:44<br>
*/
public class JMSConsumer {
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; // 預設連接
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; // 預設密碼
// private static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL; // 預設連接地址 為 failover://tcp://localhost:61616
private static final String BROKER_URL = "failover://tcp://192.168.235.100:61616"; // 指定連接地址 (my VM)
public static void main(String args[]) {
ConnectionFactory connectionFactory; // 連接工程,生產Connection
Connection connection = null; // 連接
Session session; // 會話 接受或者發送消息的線程
Destination destination; // 消息的目的地
MessageConsumer messageConsumer; // 消息消費者
// 實例化連接工廠
connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL);
try {
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 消費消息不需要事務,自動確認
// destination = session.createQueue("FirstQueue"); // 創建消息隊列
destination = session.createTopic("FirstTopic"); // 創建消息訂閱者
messageConsumer = session.createConsumer(destination); // 創建消息消費者
messageConsumer.setMessageListener(new Listener());// 註冊消息監聽
} catch (JMSException e) {
e.printStackTrace();
}
}
}
Listener.java
package com.jahentao.activemq;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
/**
* Created with IntelliJ IDEA.<br>
* Description: 訂閱者1消息監聽器<br>
* User: jahen<br>
* Date: 2017-04-02<br>
* Time: 14:46:52<br>
*/
public class Listener implements MessageListener {
@Override
public void onMessage(Message message) {
try {
System.out.println("訂閱者一 收到消息:" + ((TextMessage)message).getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
JMSConsumer2.java
package com.jahentao.activemq;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* Created with IntelliJ IDEA.<br>
* Description: 消息消費者-發佈訂閱模式 消息訂閱者<br>
* 實現方式 設置監聽<br>
* 消息訂閱者2<br>
* User: jahen<br>
* Date: 2017-04-02<br>
* Time: 13:44<br>
*/
public class JMSConsumer2 {
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; // 預設連接
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; // 預設密碼
// private static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL; // 預設連接地址 為 failover://tcp://localhost:61616
private static final String BROKER_URL = "failover://tcp://192.168.235.100:61616"; // 指定連接地址 (my VM)
public static void main(String args[]) {
ConnectionFactory connectionFactory; // 連接工程,生產Connection
Connection connection = null; // 連接
Session session; // 會話 接受或者發送消息的線程
Destination destination; // 消息的目的地
MessageConsumer messageConsumer; // 消息消費者
// 實例化連接工廠
connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL);
try {
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 消費消息不需要事務,自動確認
// destination = session.createQueue("FirstQueue"); // 創建消息隊列
destination = session.createTopic("FirstTopic"); // 創建消息訂閱者
messageConsumer = session.createConsumer(destination); // 創建消息消費者
messageConsumer.setMessageListener(new Listener2());// 註冊消息監聽
} catch (JMSException e) {
e.printStackTrace();
}
}
}
Listener2.java
package com.jahentao.activemq;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
/**
* Created with IntelliJ IDEA.<br>
* Description: 訂閱者2消息監聽器<br>
* User: jahen<br>
* Date: 2017-04-02<br>
* Time: 14:46:52<br>
*/
public class Listener2 implements MessageListener {
@Override
public void onMessage(Message message) {
try {
System.out.println("訂閱者二 收到消息:" + ((TextMessage)message).getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
首先,分別運行JMSConsumer、JMSConsumer2進行訂閱。
JMSProducer.java
package com.jahentao.activemq;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* Created with IntelliJ IDEA.<br>
* Description: JMS ActiveMQ Demo測試 發佈訂閱模式 消息發佈者<br>
* 運行前,需要打開本地的activemq。
* 如果需要更改broker地址,要提前運行相應的broker。
* User: jahen<br>
* Date: 2017-04-02<br>
* Time: 14:42:59<br>
*/
public class JMSProducer {
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; // 預設連接
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; // 預設密碼
// private static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL; // 預設連接地址 為 failover://tcp://localhost:61616
private static final String BROKER_URL = "failover://tcp://192.168.235.100:61616"; // 指定連接地址 (my VM)
private static final int SENDNUM = 10; // 發送的消息數量
public static void main(String[] args) {
ConnectionFactory connectionFactory; // 連接工程,生產Connection
Connection connection = null; // 連接
Session session; // 會話 接受或者發送消息的線程
Destination destination; // 消息的目的地
MessageProducer messageProducer; // 消息生產者
// 實例化連接工廠
connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL);
// 創建連接
try {
connection = connectionFactory.createConnection();
connection.start(); // 啟動連接
session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); // 提交事務,自動確認
// destination = session.createQueue("FirstQueue"); // 創建消息隊列
destination = session.createTopic("FirstTopic"); // 創建主題
messageProducer = session.createProducer(destination); // 創建消息發送者
sendMessage(session, messageProducer); // 發送消息
session.commit(); // 提交事務
} catch (JMSException e) {
e.printStackTrace();
} finally {
if (connection!=null)
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
/**
* 發送消息
* @param session 會話
* @param messageProducer 消息生產者
*/
private static void sendMessage(Session session, MessageProducer messageProducer) throws JMSException {
for (int i = 0; i< JMSProducer.SENDNUM; i++) {
TextMessage message = session.createTextMessage("ActiveMQ 發送的消息 "+i);
System.out.println("發送消息: ActiveMQ 發送的消息 "+i);
messageProducer.send(message);
}
}
}
然後運行JMSProducer。
參考
java1234上發佈的教程"一頭扎進ActiveMQ"
這裡學習的源碼,托管在碼雲上