ActiveMQ學習總結------實戰操作 1.ActiveMQ術語及API介紹 2.ActiveMQ 文本消息處理 3.ActiveMQ 對象消息處理 ...
相信大家通過上一篇博文已經對ActiveMQ有了一個大致的概念了,
那麼本篇博文將帶領大家一步一步去實戰操作我們的ActiveMQ
本篇主要內容:
1.ActiveMQ術語及API介紹
2.ActiveMQ 文本消息處理
3.ActiveMQ 對象消息處理
既然我們要學習如何實戰操作了,那麼久不可不知它的一些術語和API的用於
即使覺得枯燥,我們也要大致看一眼,過後相信你還會再來看的!因為很有用
一 ActiveMQ 的術語(這麼碉堡的東西怎麼會沒有寫讓人覺得更碉堡的辭彙呢?)
1 Destination
目的地,JMS Provider(消息中間件)負責維護,用於對 Message 進行管理的對象。
MessageProducer 需要指定 Destination 才能發送消息,MessageReceiver 需要指定 Destination
才能接收消息。
2 Producer
消息生成者,負責發送 Message 到目的地。
3 Consumer | Receiver
消息消費者,負責從目的地中消費【處理|監聽|訂閱】Message。
4 Message
消息,消息封裝一次通信的內容。
二 ActiveMQ API簡介
下述API都是介面類型,由定義在javax.jms包中
都是JMS(Java Message Service)標準介面定義
1.ConnectionFactory
鏈接工廠,用於創建鏈接的工廠類型
2.Connection
鏈接,用於建立訪問ActiveMQ連接的類型,由鏈接工廠ConnectionFactory創建
3.Session
會話,一次持久有效有狀態的訪問,由鏈接創建
4.Destination & Queue
目的地。用於描述本次訪問ActiveMQ的消息訪問目的地,即ActiveMQ服務中的具體隊列,由會話創建
interface Queue extends Destination
5.MessageProducer
消息生成者、在一次有效會話中,用於發送消息給ActiveMQ服務的工具,由會話創建
6.MessageConsumer
消息消費者【消息訂閱者|消息處理著】,在一次有效會話中,用於從ActiveMQ服務中獲取消息的工具,由會話創建
7.Message
消息。通過消息生成者向ActiveMQ服務發送消息時使用的數據載體對象或消費者從ActiveMQ服務中獲取消息時使用的數據載體對象。是所有消息【文本消息|對象消息等】具體類型的頂級介面,可以通過會話創建或通過會話從ActiveMQ服務中獲取
三 ActiveMQ處理文本消息
註:*本案例將以producer(消息生產者)和consumer(消息發送者)連個模塊為例
準備工作:
確保你的ActiveMQ已經開啟,並且所依賴的埠已經關閉(建議直接關閉防火牆 service iptables stop)
Maven環境
1 消息生產者
1.1創建模塊
1,2在POM文件添加ActiveMQ的依賴(根據個人的ActiveMQ版本自行查找)
<dependencies> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.9.0</version> </dependency> </dependencies>
1.3 編寫消息的生產者
package cn.arebirth.mq; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class ActiveMQTextProducer { public void sendTextActiveMQ(String msg) { //定義鏈接工廠 ConnectionFactory connectionFactory = null; //定義鏈接對象 Connection connection = null; //定義會話 Session session = null; //目的地 Destination destination = null; //定義消息的發送者 MessageProducer producer = null; //定義消息 Message message = null; try { /** * userName:訪問ActiveMQ服務的用戶名,預設為admin。 * password:訪問ActiveMQ服務的密碼,預設為admin * 用戶名和用戶密碼都可以通過ActiveMQ安裝目錄的oonf目錄下的jetty-ream.properties文件進行修改 * * borkerURL:訪問ActiveMQ服務的路徑地址。 * 路徑結構為:協議名://主機地址:埠號 * 在conf/activemq.xml文件中可以找到修改 * 在上一篇文章中都有介紹 */ connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://169.254.18.20:61616"); //創建鏈接對象 connection = connectionFactory.createConnection(); //啟動連接 connection.start(); /** * 參數一:transacted 是否使用事務 可選值為:true|false * ture:使用事務 設置第二個參數變數這為 Session.SESSION_TRANSACTION 交由session管理 * false:不使用事務,則設置我們的參數即可 * * acknowledgeMode: * * Session.AUTO_ACKNOWLEDGE:自動消息確認機制 * * Session.CLIENT_ACKNOWLEDGE:客戶端確認機制 * * Session.DUPS_OK_ACKNOWLEDGE:有副本的客戶端確認消息機制 這裡設置 這兩個參數的含義為: 不使用事務,並由Session自動確認提交 * 這裡對此不作過多講解,初學者跟著敲,其後在慢慢瞭解原理 * */ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //創建目的地,目的地名稱即隊列的名稱,也就是保存我們消息地方的名稱。消息的消費者需要通過此名稱訪問對應的隊列 //名稱不固定 destination = session.createQueue("hello-mq"); //創建消息的生產者 需要指定目的地(也就是把Destination傳入參數) producer = session.createProducer(destination); //創建消息對象 並傳入我們需要放入隊列的消息 message = session.createTextMessage(msg); //發送消息 producer.send(message); } catch (Exception e) { e.printStackTrace(); } finally { //回收消息發送者資源 if (producer != null) { try { producer.close(); } catch (JMSException e) { e.printStackTrace(); } } if (session != null) { try { session.close(); } catch (JMSException e) { e.printStackTrace(); } } if (connection != null) { try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } } }
2. 消息消費者
2.1創建模塊
2,2在POM文件添加ActiveMQ的依賴(根據個人的ActiveMQ版本自行查找)
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.9.0</version> </dependency>
2.3編寫消息的消費者
package cn.arebirth.mq; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class ActiveMQTextConsumer { public void receiveTextActiveMQ() { //定義鏈接工廠 ConnectionFactory connectionFactory = null; //定義鏈接對象 Connection connection = null; //定義會話 Session session = null; //目的地 Destination destination = null; //定義消息的消費者(接收者) MessageConsumer consumer = null; //定義消息 Message message = null; try { /** * userName:訪問ActiveMQ服務的用戶名,預設為admin。 * password:訪問ActiveMQ服務的密碼,預設為admin * 用戶名和用戶密碼都可以通過ActiveMQ安裝目錄的oonf目錄下的jetty-ream.properties文件進行修改 * * borkerURL:訪問ActiveMQ服務的路徑地址。 * 路徑結構為:協議名://主機地址:埠號 * 在conf/activemq.xml文件中可以找到修改 * 在上一篇文章中都有介紹 */ connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://169.254.18.20:61616"); //創建鏈接對象 connection = connectionFactory.createConnection(); //啟動連接 connection.start(); /** * 參數一:transacted 是否使用事務 可選值為:true|false * ture:使用事務 設置第二個參數變數這為 Session.SESSION_TRANSACTION 交由session管理 * false:不使用事務,則設置我們的參數即可 * * acknowledgeMode: * * Session.AUTO_ACKNOWLEDGE:自動消息確認機制 * * Session.CLIENT_ACKNOWLEDGE:客戶端確認機制 * * Session.DUPS_OK_ACKNOWLEDGE:有副本的客戶端確認消息機制 這裡設置 這兩個參數的含義為: 不使用事務,並由Session自動確認提交 * 這裡對此不作過多講解,初學者跟著敲,其後在慢慢瞭解原理 * */ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //創建目的地,目的地名稱即隊列的名稱,也就是保存我們消息地方的名稱。消息的消費者需要通過此名稱訪問對應的隊列 //名稱不固定 destination = session.createQueue("hello-mq"); //創建消息的消費者 需要指定目的地(也就是把Destination傳入參數) consumer = session.createConsumer(destination); //創建消息對象 由消費者接收消息 message = consumer.receive(); //處理消息 String msg = ((TextMessage) message).getText(); System.out.println("ActiveMQ say:" + msg); } catch (Exception e) { e.printStackTrace(); } finally { //回收消息接收者資源 if (consumer != null) { try { consumer.close(); } catch (JMSException e) { e.printStackTrace(); } } if (session != null) { try { session.close(); } catch (JMSException e) { e.printStackTrace(); } } if (connection != null) { try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } } }
3 測試成果
定義測試類,來測試我們的成果!
3.1 定義producer測試類
package cn.arebirth.mq; public class ProducerTest { public static void main(String[] args) { ActiveMQTextProducer producer = new ActiveMQTextProducer(); producer.sendTextActiveMQ("Hello,ActiveMQ!"); } }當我們執行完這段代碼後,我們打開我們的ActiveMQ的控制面板(首先你要啟動你的ActiveMQ,上一篇有介紹到!)
接下來,當我們看到此圖片內容即說明producer成果運行!(hello-mq我們自定義的隊列名稱!)
3.2 定義 Consumer測試類
package cn.arebirth.mq; public class ConsumerTest { public static void main(String[] args) { ActiveMQTextConsumer consumer = new ActiveMQTextConsumer(); consumer.receiveTextActiveMQ(); } }
運行結果出現如下內容即成果!
當我們仔細觀察ActiveMQ控制台的時候會發現,
producer每產生一條消息對應的Number Of Pending Message和Message Enqueued就+1
consumer每次啟動消費消息的時候,Number Of pending Message就會-1 Message Dequeued +1
為什麼呢?哈哈 相信大家已經大概猜出來了或者看英文的意思
介紹下:
- Name:消息的名字
- Number Of Pending Message:為消費的消息數量
- Number Of Consumers:消費者的數量
- Message Enqueued:隊列共有過多少消息
- Message Dequeued:消費者消費了多少條消息
鼓搗完文本消息,那麼有的人會說,只能鼓搗文本??錯了,接下來我們搞實用的對象!(不是你和我,是我們和電腦 -. -)
三 ActiveMQ處理對象消息
有了上面處理文本消息的基礎,我們將很容易掌握處理對象消息的能力!這裡面的步驟,我們將略省一部分,和上面及其相似
1 對象創建(處理對象消息,那麼一定是要有個數據載體的對象啦,我們自己創建一個即可 生產者和消費者均需要此對象!)
package cn.arebirth.pojo; import java.io.Serializable; /** * 一定要序列化!! */ public class User implements Serializable { private String username; private String pwd; private String content; @Override public String toString() { return "User{" + "username='" + username + '\'' + ", pwd='" + pwd + '\'' + ", content='" + content + '\'' + '}'; } public User() { } public User(String username, String pwd, String content) { this.username = username; this.pwd = pwd; this.content = content; } public String getUsername() { return username; } public void setUsername(String username) { this.username = username; } public String getPwd() { return pwd; } public void setPwd(String pwd) { this.pwd = pwd; } public String getContent() { return content; } public void setContent(String content) { this.content = content; } }
2 消息生產者
2.1 還是需要pom的jar包依賴的的,如果還在原來的環境上,則不需要添加啦!
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.9.0</version> </dependency>
2.2 編寫消息的生產者
package cn.arebirth.mq; import cn.arebirth.pojo.User; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class ActiveMQObjectProducer { public void sendObjectActiveMQ(User user){ //定義鏈接工廠 ConnectionFactory connectionFactory = null; //定義鏈接對象 Connection connection = null; //定義會話 Session session = null; //目的地 Destination destination = null; //定義消息的發送者 MessageProducer producer = null; //定義消息 Message message = null; try { /** * userName:訪問ActiveMQ服務的用戶名,預設為admin。 * password:訪問ActiveMQ服務的密碼,預設為admin * 用戶名和用戶密碼都可以通過ActiveMQ安裝目錄的oonf目錄下的jetty-ream.properties文件進行修改 * * borkerURL:訪問ActiveMQ服務的路徑地址。 * 路徑結構為:協議名://主機地址:埠號 * 在conf/activemq.xml文件中可以找到修改 * 在上一篇文章中都有介紹 */ connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://169.254.18.20:61616"); //創建鏈接對象 connection = connectionFactory.createConnection(); //啟動連接 connection.start(); /** * 參數一:transacted 是否使用事務 可選值為:true|false * ture:使用事務 設置第二個參數變數這為 Session.SESSION_TRANSACTION 交由session管理 * false:不使用事務,則設置我們的參數即可 * * acknowledgeMode: * * Session.AUTO_ACKNOWLEDGE:自動消息確認機制 * * Session.CLIENT_ACKNOWLEDGE:客戶端確認機制 * * Session.DUPS_OK_ACKNOWLEDGE:有副本的客戶端確認消息機制 這裡設置 這兩個參數的含義為: 不使用事務,並由Session自動確認提交 * 這裡對此不作過多講解,初學者跟著敲,其後在慢慢瞭解原理 * */ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //創建目的地,目的地名稱即隊列的名稱,也就是保存我們消息地方的名稱。消息的消費者需要通過此名稱訪問對應的隊列 //名稱不固定 destination = session.createQueue("hello-mq-obj"); //創建消息的生產者 需要指定目的地(也就是把Destination傳入參數) producer = session.createProducer(destination); //創建消息對象 並傳入我們需要放入隊列的消息 message = session.createObjectMessage(user); //發送消息 producer.send(message); } catch (Exception e) { e.printStackTrace(); } finally { //回收消息發送者資源 if (producer != null) { try { producer.close(); } catch (JMSException e) { e.printStackTrace(); } } if (session != null) { try { session.close(); } catch (JMSException e) { e.printStackTrace(); } } if (connection != null) { try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } } }
3 消息消費者
3.1 還是需要pom的jar包依賴的的,如果還在原來的環境上,則不需要添加啦!
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.9.0</version> </dependency>
3.2 編寫消息的消費者
package cn.arebirth.mq; import cn.arebirth.pojo.User; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class ActiveMQObjectConsumer { public void receiveObjectActiveMQ() { //定義鏈接工廠 ConnectionFactory connectionFactory = null; //定義鏈接對象 Connection connection = null; //定義會話 Session session = null; //目的地 Destination destination = null; //定義消息的消費者(接收者) MessageConsumer consumer = null; //定義消息 Message message = null; try { /** * userName:訪問ActiveMQ服務的用戶名,預設為admin。 * password:訪問ActiveMQ服務的密碼,預設為admin * 用戶名和用戶密碼都可以通過ActiveMQ安裝目錄的oonf目錄下的jetty-ream.properties文件進行修改 * * borkerURL:訪問ActiveMQ服務的路徑地址。 * 路徑結構為:協議名://主機地址:埠號 * 在conf/activemq.xml文件中可以找到修改 * 在上一篇文章中都有介紹 */ connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://169.254.18.20:61616"); //創建鏈接對象 connection = connectionFactory.createConnection(); //啟動連接 connection.start(); /** * 參數一:transacted 是否使用事務 可選值為:true|false * ture:使用事務 設置第二個參數變數這為 Session.SESSION_TRANSACTION 交由session管理 * false:不使用事務,則設置我們的參數即可 * * acknowledgeMode: * * Session.AUTO_ACKNOWLEDGE:自動消息確認機制 * * Session.CLIENT_ACKNOWLEDGE:客戶端確認機制 * * Session.DUPS_OK_ACKNOWLEDGE:有副本的客戶端確認消息機制 這裡設置 這兩個參數的含義為: 不使用事務,並由Session自動確認提交 * 這裡對此不作過多講解,初學者跟著敲,其後在慢慢瞭解原理 * */ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //創建目的地,目的地名稱即隊列的名稱,也就是保存我們消息地方的名稱。消息的消費者需要通過此名稱訪問對應的隊列 //名稱不固定 destination = session.createQueue("hello-mq-obj"); //創建消息的消費者 需要指定目的地(也就是把Destination傳入參數) consumer = session.createConsumer(destination); //創建消息對象 由消費者接收消息 message = consumer.receive(); //處理消息 ObjectMessage objectMessage = (ObjectMessage) message; User user = (User) objectMessage.getObject(); System.out.println("ActiveMQ say:" + user); } catch (Exception e) { e.printStackTrace(); } finally { //回收消息接收者資源 if (consumer != null) { try { consumer.close(); } catch (JMSException e) { e.printStackTrace(); } } if (session != null) { try { session.close(); } catch (JMSException e) { e.printStackTrace(); } } if (connection != null) { try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } } }
4 創建測試類
4.1 Produce生產者測試代碼
package cn.arebirth.mq; import cn.arebirth.pojo.User; public class ProducerTest { public static void main(String[] args) { ActiveMQObjectProducer producer = new ActiveMQObjectProducer(); producer.sendObjectActiveMQ(new User("Arebirth","123","Hello,ActiveMQ!")); } }
當我們執行完這段代碼後,我們打開我們的ActiveMQ的控制面板(首先你要啟動你的ActiveMQ,上一篇有介紹到!)
會看到 隊列中增加了我們的消息!
4.2 Consumer消費者測試代碼
package cn.arebirth.mq; public class ConsumerTest { public static void main(String[] args) { ActiveMQObjectConsumer consumer = new ActiveMQObjectConsumer(); consumer.receiveObjectActiveMQ(); } }當我們執行後,出現下麵的內容即可!成功啦!
在看下,控制面板,
是不是已經和上一個列子一樣啦!
ps
本篇內容就到這裡了,希望各位勤加敲代碼,畢竟代碼不是看出來的~