建議先瞭解為什麼項目要使用 MQ 消息隊列,MQ 消息隊列有什麼優點,如果在業務邏輯上沒有此種需求,建議不要使用中間件。中間件對系統的性能做優化的同時,同時增加了系統的複雜性也維護難易度;其次,需要瞭解各種常見的 MQ 消息隊列有什麼區別,以便在相同的成本下選擇一種最合適本系統的技術。 本文主要討論 ...
建議先瞭解為什麼項目要使用 MQ 消息隊列,MQ 消息隊列有什麼優點,如果在業務邏輯上沒有此種需求,建議不要使用中間件。中間件對系統的性能做優化的同時,同時增加了系統的複雜性也維護難易度;其次,需要瞭解各種常見的 MQ 消息隊列有什麼區別,以便在相同的成本下選擇一種最合適本系統的技術。
本文主要討論 RabbitMQ,從3月底接觸一個項目使用了 RabbitMQ,就開始著手學習,主要通過視頻和博客學習了一個月,基本明白了 RabbitMQ 的應用,其它的 MQ 隊列還不清楚,其底層技術還有待學習,以下是我目前的學習心得。
1.安裝 Erlang
RabbitMQ 是基於 Erlang 語言寫的,所以首先安裝 Erlang,本例是在 Windows 上安裝,也可以選擇在 Linux 上安裝,機器上沒有虛擬機,直接在 Windows 上操作,建議在 Linux 上安裝。官方下載 Erlang 軟體,我下載最新版本 21.3。安裝過程很簡單,直接 Next 到底。 Linux 安裝自行谷歌。如下圖:
安裝結束後,設置環境變數,如下圖
測試是否安裝成功
2.安裝 RabbitMQ
在官方下載,選擇最新版本 3.7。安裝過程很簡單,直接 Next 到底。如下圖:
測試安裝是否成功,進入安裝目錄 sbin,執行 rabbitmq-plugins enable rabbitmq_management 命令,出現下麵界面,證明安裝成功(建議以管理員方式打開 dos)。
執行 rabbitmq-server start 命令,啟動服務。本地登陸並創建用戶,如下圖:
關於tags標簽的解釋:1、 超級管理員(administrator)
可登陸管理控制台,可查看所有的信息,並且可以對用戶,策略(policy)進行操作。
2、 監控者(monitoring)
可登陸管理控制台,同時可以查看rabbitmq節點的相關信息(進程數,記憶體使用情況,磁碟使用情況等)
3、 策略制定者(policymaker)
可登陸管理控制台, 同時可以對policy進行管理。但無法查看節點的相關信息(上圖紅框標識的部分)。
4、 普通管理者(management)
僅可登陸管理控制台,無法看到節點信息,也無法對策略進行管理。
5、 其他
無法登陸管理控制台,通常就是普通的生產者和消費者。
4.JAVA 操作RabbitMQ
參考 RabbitMQ 官網,一共分為6個模式
RabbitMQ 是一個消息代理,實際上,它接收生產者產生的消息,然後將消息傳遞給消費者。在這個過程中,它可以路由、緩衝、持久化等,在傳輸過程中,主要又三部分組成。生產者:發送消息的一端
隊列:它活動在 RabbitMQ 伺服器中,消息存儲的地方,隊列本質上是一個緩衝對象,所以存儲的消息不受限制 消費者:消息接收端 一般情況下,消息生產者、消費者和隊列不在同一臺伺服器上,本地做測試,放在一臺伺服器上。 測試項目直接創建一個 maven 格式的項目,沒必要創建網路格式。新建一個項目,如下圖:首先準備操作 MQ 的環境(1): 準備必要的 Pom 文件,導入相應的 jar 包,
1 <?xml version="1.0" encoding="UTF-8"?> 2 3 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 4 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 5 <modelVersion>4.0.0</modelVersion> 6 7 <groupId>com.edu</groupId> 8 <artifactId>rabbitmqdemo</artifactId> 9 <version>1.0</version> 10 11 <name>rabbitmqdemo</name> 12 <!-- FIXME change it to the project's website --> 13 <url>http://www.example.com</url> 14 15 <properties> 16 <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> 17 <maven.compiler.source>1.7</maven.compiler.source> 18 <maven.compiler.target>1.7</maven.compiler.target> 19 </properties> 20 21 <dependencies> 22 <!--測試包--> 23 <dependency> 24 <groupId>junit</groupId> 25 <artifactId>junit</artifactId> 26 <version>4.11</version> 27 <scope>test</scope> 28 </dependency> 29 <!--mq客戶端--> 30 <dependency> 31 <groupId>com.rabbitmq</groupId> 32 <artifactId>amqp-client</artifactId> 33 <version>4.5.0</version> 34 </dependency> 35 <!--日誌--> 36 <dependency> 37 <groupId>org.slf4j</groupId> 38 <artifactId>slf4j-log4j12</artifactId> 39 <version>1.7.25</version> 40 </dependency> 41 <!--工具包--> 42 <dependency> 43 <groupId>org.apache.commons</groupId> 44 <artifactId>commons-lang3</artifactId> 45 <version>3.3.2</version> 46 </dependency> 47 <!--spring集成--> 48 <dependency> 49 <groupId>org.springframework.amqp</groupId> 50 <artifactId>spring-rabbit</artifactId> 51 <version>1.7.6.RELEASE</version> 52 </dependency> 53 <dependency> 54 <groupId>org.springframework</groupId> 55 <artifactId>spring-test</artifactId> 56 <version>4.3.7.RELEASE</version> 57 </dependency> 58 <dependency> 59 <groupId>junit</groupId> 60 <artifactId>junit</artifactId> 61 <version>RELEASE</version> 62 <scope>compile</scope> 63 </dependency> 64 </dependencies> 65 66 <build> 67 <pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) --> 68 <plugins> 69 <!-- clean lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#clean_Lifecycle --> 70 <plugin> 71 <artifactId>maven-clean-plugin</artifactId> 72 <version>3.1.0</version> 73 </plugin> 74 <!-- default lifecycle, jar packaging: see https://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging --> 75 <plugin> 76 <artifactId>maven-resources-plugin</artifactId> 77 <version>3.0.2</version> 78 </plugin> 79 <plugin> 80 <artifactId>maven-compiler-plugin</artifactId> 81 <version>3.8.0</version> 82 </plugin> 83 <plugin> 84 <artifactId>maven-surefire-plugin</artifactId> 85 <version>2.22.1</version> 86 </plugin> 87 <plugin> 88 <artifactId>maven-jar-plugin</artifactId> 89 <version>3.0.2</version> 90 </plugin> 91 <plugin> 92 <artifactId>maven-install-plugin</artifactId> 93 <version>2.5.2</version> 94 </plugin> 95 <plugin> 96 <artifactId>maven-deploy-plugin</artifactId> 97 <version>2.8.2</version> 98 </plugin> 99 <!-- site lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#site_Lifecycle --> 100 <plugin> 101 <artifactId>maven-site-plugin</artifactId> 102 <version>3.7.1</version> 103 </plugin> 104 <plugin> 105 <artifactId>maven-project-info-reports-plugin</artifactId> 106 <version>3.0.0</version> 107 </plugin> 108 </plugins> 109 </pluginManagement> 110 </build> 111 </project>
(2): 建立日誌配置文件,在 resources 下建立 log4j.properties,便於列印精確的日誌信息
1 log4j.rootLogger=DEBUG,A1 2 log4j.logger.com.edu=DEBUG 3 log4j.logger.org.mybatis=DEBUG 4 log4j.appender.A1=org.apache.log4j.ConsoleAppender 5 log4j.appender.A1.layout=org.apache.log4j.PatternLayout 6 log4j.appender.A1.layout.ConversionPattern=%-d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c]-%m%n
(3): 編寫一個工具類,主要用於連接 RabbitMQ
1 package com.edu.util; 2 3 4 import com.rabbitmq.client.Connection; 5 import com.rabbitmq.client.ConnectionFactory; 6 7 /** 8 * @ClassName ConnectionUtil 9 * @Deccription 穿件連接的工具類 10 * @Author DZ 11 * @Date 2019/5/4 12:27 12 **/ 13 public class ConnectionUtil { 14 /** 15 * 創建連接工具 16 * 17 * @return 18 * @throws Exception 19 */ 20 public static Connection getConnection() throws Exception { 21 ConnectionFactory connectionFactory = new ConnectionFactory(); 22 connectionFactory.setHost("127.0.0.1");//MQ的伺服器 23 connectionFactory.setPort(5672);//預設埠號 24 connectionFactory.setUsername("test"); 25 connectionFactory.setPassword("test"); 26 connectionFactory.setVirtualHost("/test"); 27 return connectionFactory.newConnection(); 28 } 29 }
項目總體圖如下:
4.1.Hello World模式
此模式非常簡單,一個生產者對應一個消費者
首先我們製造一個消息生產者,併發送消息:1 package com.edu.hello; 2 3 import com.edu.util.ConnectionUtil; 4 import com.rabbitmq.client.Channel; 5 import com.rabbitmq.client.Connection; 6 7 /** 8 * @ClassName Sender 9 * @Deccription 創建發送者 10 * @Author DZ 11 * @Date 2019/5/4 12:45 12 **/ 13 public class Sender { 14 private final static String QUEUE = "testhello"; //隊列的名字 15 16 public static void main(String[] srgs) throws Exception { 17 //獲取連接 18 Connection connection = ConnectionUtil.getConnection(); 19 //創建連接 20 Channel channel = connection.createChannel(); 21 //聲明隊列 22 //參數1:隊列的名字 23 //參數2:是否持久化隊列,我們的隊列存在記憶體中,如果mq重啟則丟失。如果為ture,則保存在erlang的資料庫中,重啟,依舊保存 24 //參數3:是否排外,我們連接關閉後是否自動刪除隊列,是否私有當前隊列,如果私有,其他隊列不能訪問 25 //參數4:是否自動刪除 26 //參數5:我們傳入的其他參數 27 channel.queueDeclare(QUEUE, false, false, false, null); 28 //發送內容 29 channel.basicPublish("", QUEUE, null, "要發送的消息".getBytes()); 30 //關閉連接 31 channel.close(); 32 connection.close(); 33 } 34 }
定義一個消息接受者
1 package com.edu.hello; 2 3 import com.edu.util.ConnectionUtil; 4 import com.rabbitmq.client.Channel; 5 import com.rabbitmq.client.Connection; 6 import com.rabbitmq.client.QueueingConsumer; 7 8 /** 9 * @ClassName Recver 10 * @Deccription 消息接受者 11 * @Author DZ 12 * @Date 2019/5/4 12:58 13 **/ 14 public class Recver { 15 private final static String QUEUE = "testhello";//消息隊列的名稱 16 17 public static void main(String[] args) throws Exception { 18 Connection connection = ConnectionUtil.getConnection(); 19 Channel channel = connection.createChannel(); 20 channel.queueDeclare(QUEUE, false, false, false, null); 21 QueueingConsumer queueingConsumer = new QueueingConsumer(channel); 22 //接受消息,參數2表示自動確認消息 23 channel.basicConsume(QUEUE, true, queueingConsumer); 24 while (true) { 25 //獲取消息 26 QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();//如果沒有消息就等待,有消息就獲取消息,並銷毀,是一次性的 27 String message = new String(delivery.getBody()); 28 System.out.println(message); 29 } 30 } 31 }
此種模式屬於“點對點”模式,一個生產者、一個隊列、一個消費者,可以運用在聊天室(實際上真實的聊天室比這複雜很多,雖然是“點對點”模式,但是並不是一個生產者,一個隊列,一個消費者)
4.2.work queues
一個生產者對應多個消費者,但是只有一個消費者獲得消息定義消息製造者:
1 package com.edu.work; 2 3 import com.edu.util.ConnectionUtil; 4 import com.rabbitmq.client.Channel; 5 import com.rabbitmq.client.Connection; 6 7 /** 8 * @ClassName Sender 9 * @Deccription 創建發送者 10 * @Author DZ 11 * @Date 2019/5/4 12:45 12 **/ 13 public class Sender { 14 private final static String QUEUE = "testhellowork"; //隊列的名字 15 16 public static void main(String[] srgs) throws Exception { 17 //獲取連接 18 Connection connection = ConnectionUtil.getConnection(); 19 //創建連接 20 Channel channel = connection.createChannel(); 21 //聲明隊列 22 //參數1:隊列的名字 23 //參數2:是否持久化隊列,我們的隊列存在記憶體中,如果mq重啟則丟失。如果為ture,則保存在erlang的資料庫中,重啟,依舊保存 24 //參數3:是否排外,我們連接關閉後是否自動刪除隊列,是否私有當前隊列,如果私有,其他隊列不能訪問 25 //參數4:是否自動刪除 26 //參數5:我們傳入的其他參數 27 channel.queueDeclare(QUEUE, false, false, false, null); 28 //發送內容 29 for (int i = 0; i < 100; i++) { 30 channel.basicPublish("", QUEUE, null, ("要發送的消息" + i).getBytes()); 31 } 32 //關閉連接 33 channel.close(); 34 connection.close(); 35 } 36 }
定義2個消息消費者
1 package com.edu.work; 2 3 import com.edu.util.ConnectionUtil; 4 import com.rabbitmq.client.*; 5 6 import java.io.IOException; 7 import java.util.Queue; 8 9 /** 10 * @ClassName Recver1 11 * @Deccription 消息接受者 12 * @Author DZ 13 * @Date 2019/5/4 12:58 14 **/ 15 public class Recver1 { 16 private final static String QUEUE = "testhellowork";//消息隊列的名稱 17 18 public static void main(String[] args) throws Exception { 19 Connection connection = ConnectionUtil.getConnection(); 20 final Channel channel = connection.createChannel(); 21 channel.queueDeclare(QUEUE, false, false, false, null); 22 //channel.basicQos(1);//告訴伺服器,當前消息沒有確認之前,不要發送新消息,合理自動分配資源 23 DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { 24 @Override 25 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { 26 //收到消息時候調用 27 System.out.println("消費者1收到的消息:" + new String(body)); 28 /*super.handleDelivery(consumerTag, envelope, properties, body);*/ 29 //確認消息 30 //參數2:false為確認收到消息,ture為拒絕收到消息 31 channel.basicAck(envelope.getDeliveryTag(), false); 32 } 33 }; 34 //註冊消費者 35 // 參數2:手動確認,我們收到消息後,需要手動確認,告訴伺服器,我們收到消息了 36 channel.basicConsume(QUEUE, false, defaultConsumer); 37 } 38 }
1 package com.edu.work; 2 3 import com.edu.util.ConnectionUtil; 4