JMS(Java Message Service,Java消息服務)是一組Java應用程式介面(Java API),它提供創建、發送、接收、讀取消息的服務。它給消息中間件生產商提供了一個統一API的標準。 ...
本文代碼使用ActiveMq5.6
一、什麼是JMS
JMS(Java Message Service,Java消息服務)是一組Java應用程式介面(Java API),它提供創建、發送、接收、讀取消息的服務。它給消息中間件生產商提供了一個統一API的標準。
第一個版本1998年,目前最新的2.0版本(2013年第一季度發佈)
JMS規範地址:
http://www.oracle.com/technetwork/java/docs-136352.html(1.1版本)
http://download.oracle.com/otndocs/jcp/jms-2_0-fr-eval-spec/index.html(2.0版本)
中間件:中間件是一種獨立的系統軟體或服務程式,分散式應用軟體藉助這種軟體在不同的技術之間共用資源。也就是應用程式A與應用程式B之間不是直接交互,而是通過中間件進行資源共用。
二、JMS消息模型
1、JMS元素
-
JMS provider:An implementation of the JMS interface for a Message Oriented Middleware (MOM) 面向消息中間件的JMS介面的實現
-
JMS client:An application or process that produces and/or receives messages. 產生和/或接收消息的應用程式或過程
-
JMS producer/publisher:A JMS client that creates and sends messages. 創建和發送消息的JMS客戶端。
-
JMS consumer/subscriber:A JMS client that receives messages. 接收消息的JMS客戶機。
-
JMS message(header/payload):An object that contains the data being transferred between JMS clients. 包含JMS客戶機之間傳輸數據的對象
-
JMS queue:A staging area that contains messages that have been sent and are waiting to be read. A JMS queue only guarantees that each message is processed only once.包含已發送並等待被讀取的消息的暫存區域。JMS隊列只保證每個消息只處理一次。
-
JMS topic:A distribution mechanism for publishing messages that are delivered to multiple subscribers. 發佈消息傳遞給多個訂閱伺服器的分發機制。
2、JMS消息模型
a、點對點的消息模型: 每個消息只能有一個消費者。
消息的生產者和消費者之間沒有時間上的相關性。無論消費者在生產者發送消息的時候是否處於運行狀態,它都可以提取消息。
b、發佈訂閱消息模型 每個消息可以有多個消費者
生產者和消費者之間有時間上的相關性。訂閱一個主題的消費者只能消費自它訂閱之後發佈的消息。JMS規範允許客戶創建持久訂閱,這在一定程度上放鬆了時間上的相關性要求。持久訂閱允許消費者消費它在未處於激活狀態時發送的消息。
三、JMS消息介面
JMS支持兩種消息類型P -to- P 和 Pub-Sub 這兩種消息類型都繼承統一介面JMS Parent,主要介面是
JMS Parent |
P -to -P |
Pub - Sub |
ConnectionFactory |
QueueConnectionFactory |
TopicConnectionFactory |
Connection |
QueueConnection |
TopicConnection |
Destination |
QueueDestination |
TopicDestination |
Session |
QueueSession |
TopicSession |
MessageProducer |
QueueSender |
TopicPublisher |
MessageConsumer |
QueueReceiver |
TopicSubscriber |
1、介面描述:
ConnectionFactory:連接工廠,JMS創建連接的方式
Connection:JMS客戶端與JMS Provider的連接(通過ConnectionFactory創建的)。
Destination:消息的目的地
Session:一個接收或發送消息的一次回話
MessageProducer:由session對象創建的用於發送消息的對象
MessageConsumer:由session對象創建的用來接收消息的對象
2、JMS的創建流程
- 創建ConnectionFactory工廠
- 通過工廠創建Connection連接
- 通過連接創建一個連接回話Session
- 通過Session創建消息的生產者MessageProducer和消息的消費者MessageConsumer;同時Session也創建一個消息Mesage。
- 消息的生產者MessageProducer將該消息發送到目的地中Destination;消費者同時監聽該消息目的地Destination。
P -to- P模型:
生產者:
package com.jalja.org.base.JMS; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class ProducerJMS { private static String url="http://localhost:8161";//ActiveMq的地址 private static String queueName="queue-ch2";//創建一個隊列 public static void main(String[] args) throws JMSException { // ConnectionFactory connFactory = new ActiveMQConnectionFactory(); Connection conn = connFactory.createConnection(); conn.start();//開啟連接 //創建Session Session session = conn.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); //創建一個p -to -P 類型的消息隊列(消息目的地) Destination des = session.createQueue(queueName); //將消息交給消息發送者 MessageProducer producer = session.createProducer(des); //創建一個text類型的消息 TextMessage msg = session.createTextMessage(); msg.setText("Hello World!"); //將消息發送到消息隊列中 producer.send(msg); //關閉資源 session.close(); conn.close(); } }
消費者
package com.jalja.org.base.JMS; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.MessageConsumer; import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; public class ConsumerA { private static String url = "tcp://localhost:8161"; private static String queueName = "queue-ch2"; public static void main(String[] args) throws Exception { ConnectionFactory connFactory = new ActiveMQConnectionFactory(); Connection conn = connFactory.createConnection(); conn.start();//啟動連接 //創建Session Session session = conn.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); //創建一個p -to -P 類型的消息隊列(消息目的地) Destination des = session.createQueue(queueName); //創建一個消息消費者 並指定其消費信息的目的地 MessageConsumer consumer = session.createConsumer(des); //監聽消息隊列 Listener listener = new Listener(); listener.setForm("ConsumerA"); consumer.setMessageListener(listener); } }
消息監聽器:
package com.jalja.org.base.JMS; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; public class Listener implements MessageListener { private String form = null; /** * 監聽消息隊列,如果該消息隊列有消息,該監聽器就可以獲取消息 */ @Override public void onMessage(Message message) { TextMessage msg = (TextMessage)message; try { System.out.println(getForm() + ":" + msg.getText()); } catch (JMSException e) { e.printStackTrace(); } } public String getForm() { return form; } public void setForm(String form) { this.form = form; } }
Pub - Sub 模型
生產者:
package ch02.pubsub; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; /* * pubsub MQ生產者 */ public class Publisher { private static String url = "tcp://localhost:8161"; private static String topicName = "topic.ch02"; public static void main(String[] args) throws Exception { // TODO Auto-generated method stub ConnectionFactory connFactory = new ActiveMQConnectionFactory(); Connection conn = connFactory.createConnection(); conn.start(); Session session = conn.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); Destination des = session.createTopic(topicName); MessageProducer publisher = session.createProducer(des); TextMessage msg = session.createTextMessage(); msg.setText("Hello World!"); publisher.send(msg); session.close(); conn.close(); } }
消費者:
package ch02.pubsub; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; /** * * PTP 消費者A * */ public class SubscriberA { private static String url = "tcp://localhost:8161"; private static String topicName = "topic.ch02"; /** * @param args * @throws Exception */ public static void main(String[] args) throws Exception { // TODO Auto-generated method stub ConnectionFactory connFactory = new ActiveMQConnectionFactory(); Connection conn = connFactory.createConnection(); conn.start(); Session session = conn.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); Destination des = session.createTopic(topicName); MessageConsumer consumer = session.createConsumer(des); Listener listener = new Listener(); listener.setForm("SubscriberA"); consumer.setMessageListener(listener); } }
消息監聽器:
package ch02.pubsub; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; public class Listener implements MessageListener { private String form = null; @Override public void onMessage(Message message) { TextMessage msg = (TextMessage)message; try { System.out.println(getForm() + ":" + msg.getText()); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } public String getForm() { return form; } public void setForm(String form) { this.form = form; } }