java基礎(六):RabbitMQ 入門

来源:https://www.cnblogs.com/dz-boss/archive/2019/05/05/10817100.html
-Advertisement-
Play Games

建議先瞭解為什麼項目要使用 MQ 消息隊列,MQ 消息隊列有什麼優點,如果在業務邏輯上沒有此種需求,建議不要使用中間件。中間件對系統的性能做優化的同時,同時增加了系統的複雜性也維護難易度;其次,需要瞭解各種常見的 MQ 消息隊列有什麼區別,以便在相同的成本下選擇一種最合適本系統的技術。 本文主要討論 ...


 建議先瞭解為什麼項目要使用 MQ 消息隊列,MQ 消息隊列有什麼優點,如果在業務邏輯上沒有此種需求,建議不要使用中間件。中間件對系統的性能做優化的同時,同時增加了系統的複雜性也維護難易度;其次,需要瞭解各種常見的 MQ 消息隊列有什麼區別,以便在相同的成本下選擇一種最合適本系統的技術。

本文主要討論 RabbitMQ,從3月底接觸一個項目使用了 RabbitMQ,就開始著手學習,主要通過視頻和博客學習了一個月,基本明白了 RabbitMQ 的應用,其它的 MQ 隊列還不清楚,其底層技術還有待學習,以下是我目前的學習心得。

1.安裝 Erlang

RabbitMQ 是基於 Erlang 語言寫的,所以首先安裝 Erlang,本例是在 Windows 上安裝,也可以選擇在 Linux 上安裝,機器上沒有虛擬機,直接在 Windows 上操作,建議在 Linux 上安裝。官方下載 Erlang 軟體,我下載最新版本 21.3。安裝過程很簡單,直接 Next 到底。 Linux 安裝自行谷歌。如下圖:



安裝結束後,設置環境變數,如下圖
 
測試是否安裝成功

2.安裝 RabbitMQ

官方下載,選擇最新版本 3.7。安裝過程很簡單,直接 Next 到底。如下圖:


測試安裝是否成功,進入安裝目錄 sbin,執行 rabbitmq-plugins enable rabbitmq_management 命令,出現下麵界面,證明安裝成功(建議以管理員方式打開 dos)。
 

執行 rabbitmq-server start 命令,啟動服務。本地登陸並創建用戶,如下圖:

關於tags標簽的解釋:

1、  超級管理員(administrator)

可登陸管理控制台,可查看所有的信息,並且可以對用戶,策略(policy)進行操作。

2、  監控者(monitoring)

可登陸管理控制台,同時可以查看rabbitmq節點的相關信息(進程數,記憶體使用情況,磁碟使用情況等)

3、  策略制定者(policymaker)

可登陸管理控制台, 同時可以對policy進行管理。但無法查看節點的相關信息(上圖紅框標識的部分)。

4、  普通管理者(management)

僅可登陸管理控制台,無法看到節點信息,也無法對策略進行管理。

5、  其他

無法登陸管理控制台,通常就是普通的生產者和消費者。

4.JAVA 操作RabbitMQ

參考 RabbitMQ 官網,一共分為6個模式

RabbitMQ 是一個消息代理,實際上,它接收生產者產生的消息,然後將消息傳遞給消費者。在這個過程中,它可以路由、緩衝、持久化等,在傳輸過程中,主要又三部分組成。

生產者:發送消息的一端

隊列:它活動在 RabbitMQ 伺服器中,消息存儲的地方,隊列本質上是一個緩衝對象,所以存儲的消息不受限制 消費者:消息接收端 一般情況下,消息生產者、消費者和隊列不在同一臺伺服器上,本地做測試,放在一臺伺服器上。 測試項目直接創建一個 maven 格式的項目,沒必要創建網路格式。新建一個項目,如下圖:首先準備操作 MQ 的環境

(1): 準備必要的 Pom 文件,導入相應的 jar 包,

  1 <?xml version="1.0" encoding="UTF-8"?>
  2 
  3 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5   <modelVersion>4.0.0</modelVersion>
  6 
  7   <groupId>com.edu</groupId>
  8   <artifactId>rabbitmqdemo</artifactId>
  9   <version>1.0</version>
 10 
 11   <name>rabbitmqdemo</name>
 12   <!-- FIXME change it to the project's website -->
 13   <url>http://www.example.com</url>
 14 
 15   <properties>
 16     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
 17     <maven.compiler.source>1.7</maven.compiler.source>
 18     <maven.compiler.target>1.7</maven.compiler.target>
 19   </properties>
 20 
 21   <dependencies>
 22     <!--測試包-->
 23     <dependency>
 24       <groupId>junit</groupId>
 25       <artifactId>junit</artifactId>
 26       <version>4.11</version>
 27       <scope>test</scope>
 28     </dependency>
 29     <!--mq客戶端-->
 30     <dependency>
 31       <groupId>com.rabbitmq</groupId>
 32       <artifactId>amqp-client</artifactId>
 33       <version>4.5.0</version>
 34     </dependency>
 35     <!--日誌-->
 36     <dependency>
 37       <groupId>org.slf4j</groupId>
 38       <artifactId>slf4j-log4j12</artifactId>
 39       <version>1.7.25</version>
 40     </dependency>
 41     <!--工具包-->
 42     <dependency>
 43       <groupId>org.apache.commons</groupId>
 44       <artifactId>commons-lang3</artifactId>
 45       <version>3.3.2</version>
 46     </dependency>
 47     <!--spring集成-->
 48     <dependency>
 49       <groupId>org.springframework.amqp</groupId>
 50       <artifactId>spring-rabbit</artifactId>
 51       <version>1.7.6.RELEASE</version>
 52     </dependency>
 53     <dependency>
 54       <groupId>org.springframework</groupId>
 55       <artifactId>spring-test</artifactId>
 56       <version>4.3.7.RELEASE</version>
 57     </dependency>
 58       <dependency>
 59           <groupId>junit</groupId>
 60           <artifactId>junit</artifactId>
 61           <version>RELEASE</version>
 62           <scope>compile</scope>
 63       </dependency>
 64   </dependencies>
 65 
 66   <build>
 67     <pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
 68       <plugins>
 69         <!-- clean lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#clean_Lifecycle -->
 70         <plugin>
 71           <artifactId>maven-clean-plugin</artifactId>
 72           <version>3.1.0</version>
 73         </plugin>
 74         <!-- default lifecycle, jar packaging: see https://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging -->
 75         <plugin>
 76           <artifactId>maven-resources-plugin</artifactId>
 77           <version>3.0.2</version>
 78         </plugin>
 79         <plugin>
 80           <artifactId>maven-compiler-plugin</artifactId>
 81           <version>3.8.0</version>
 82         </plugin>
 83         <plugin>
 84           <artifactId>maven-surefire-plugin</artifactId>
 85           <version>2.22.1</version>
 86         </plugin>
 87         <plugin>
 88           <artifactId>maven-jar-plugin</artifactId>
 89           <version>3.0.2</version>
 90         </plugin>
 91         <plugin>
 92           <artifactId>maven-install-plugin</artifactId>
 93           <version>2.5.2</version>
 94         </plugin>
 95         <plugin>
 96           <artifactId>maven-deploy-plugin</artifactId>
 97           <version>2.8.2</version>
 98         </plugin>
 99         <!-- site lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#site_Lifecycle -->
100         <plugin>
101           <artifactId>maven-site-plugin</artifactId>
102           <version>3.7.1</version>
103         </plugin>
104         <plugin>
105           <artifactId>maven-project-info-reports-plugin</artifactId>
106           <version>3.0.0</version>
107         </plugin>
108       </plugins>
109     </pluginManagement>
110   </build>
111 </project>

 

(2): 建立日誌配置文件,在 resources 下建立 log4j.properties,便於列印精確的日誌信息

1 log4j.rootLogger=DEBUG,A1
2 log4j.logger.com.edu=DEBUG
3 log4j.logger.org.mybatis=DEBUG
4 log4j.appender.A1=org.apache.log4j.ConsoleAppender
5 log4j.appender.A1.layout=org.apache.log4j.PatternLayout
6 log4j.appender.A1.layout.ConversionPattern=%-d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c]-%m%n

(3): 編寫一個工具類,主要用於連接 RabbitMQ

 1 package com.edu.util;
 2 
 3 
 4 import com.rabbitmq.client.Connection;
 5 import com.rabbitmq.client.ConnectionFactory;
 6 
 7 /**
 8  * @ClassName ConnectionUtil
 9  * @Deccription 穿件連接的工具類
10  * @Author DZ
11  * @Date 2019/5/4 12:27
12  **/
13 public class ConnectionUtil {
14     /**
15      * 創建連接工具
16      *
17      * @return
18      * @throws Exception
19      */
20     public static Connection getConnection() throws Exception {
21         ConnectionFactory connectionFactory = new ConnectionFactory();
22         connectionFactory.setHost("127.0.0.1");//MQ的伺服器
23         connectionFactory.setPort(5672);//預設埠號
24         connectionFactory.setUsername("test");
25         connectionFactory.setPassword("test");
26         connectionFactory.setVirtualHost("/test");
27         return connectionFactory.newConnection();
28     }
29 }

項目總體圖如下:

4.1.Hello World模式

此模式非常簡單,一個生產者對應一個消費者

首先我們製造一個消息生產者,併發送消息:
 1 package com.edu.hello;
 2 
 3 import com.edu.util.ConnectionUtil;
 4 import com.rabbitmq.client.Channel;
 5 import com.rabbitmq.client.Connection;
 6 
 7 /**
 8  * @ClassName Sender
 9  * @Deccription 創建發送者
10  * @Author DZ
11  * @Date 2019/5/4 12:45
12  **/
13 public class Sender {
14     private final static String QUEUE = "testhello"; //隊列的名字
15 
16     public static void main(String[] srgs) throws Exception {
17         //獲取連接
18         Connection connection = ConnectionUtil.getConnection();
19         //創建連接
20         Channel channel = connection.createChannel();
21         //聲明隊列
22         //參數1:隊列的名字
23         //參數2:是否持久化隊列,我們的隊列存在記憶體中,如果mq重啟則丟失。如果為ture,則保存在erlang的資料庫中,重啟,依舊保存
24         //參數3:是否排外,我們連接關閉後是否自動刪除隊列,是否私有當前隊列,如果私有,其他隊列不能訪問
25         //參數4:是否自動刪除
26         //參數5:我們傳入的其他參數
27         channel.queueDeclare(QUEUE, false, false, false, null);
28         //發送內容
29         channel.basicPublish("", QUEUE, null, "要發送的消息".getBytes());
30         //關閉連接
31         channel.close();
32         connection.close();
33     }
34 }

定義一個消息接受者

 1 package com.edu.hello;
 2 
 3 import com.edu.util.ConnectionUtil;
 4 import com.rabbitmq.client.Channel;
 5 import com.rabbitmq.client.Connection;
 6 import com.rabbitmq.client.QueueingConsumer;
 7 
 8 /**
 9  * @ClassName Recver
10  * @Deccription 消息接受者
11  * @Author DZ
12  * @Date 2019/5/4 12:58
13  **/
14 public class Recver {
15     private final static String QUEUE = "testhello";//消息隊列的名稱
16 
17     public static void main(String[] args) throws Exception {
18         Connection connection = ConnectionUtil.getConnection();
19         Channel channel = connection.createChannel();
20         channel.queueDeclare(QUEUE, false, false, false, null);
21         QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
22         //接受消息,參數2表示自動確認消息
23         channel.basicConsume(QUEUE, true, queueingConsumer);
24         while (true) {
25             //獲取消息
26             QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();//如果沒有消息就等待,有消息就獲取消息,並銷毀,是一次性的
27             String message = new String(delivery.getBody());
28             System.out.println(message);
29         }
30     }
31 }

此種模式屬於“點對點”模式,一個生產者、一個隊列、一個消費者,可以運用在聊天室(實際上真實的聊天室比這複雜很多,雖然是“點對點”模式,但是並不是一個生產者,一個隊列,一個消費者)

4.2.work queues

一個生產者對應多個消費者,但是只有一個消費者獲得消息

定義消息製造者:

 1 package com.edu.work;
 2 
 3 import com.edu.util.ConnectionUtil;
 4 import com.rabbitmq.client.Channel;
 5 import com.rabbitmq.client.Connection;
 6 
 7 /**
 8  * @ClassName Sender
 9  * @Deccription 創建發送者
10  * @Author DZ
11  * @Date 2019/5/4 12:45
12  **/
13 public class Sender {
14     private final static String QUEUE = "testhellowork"; //隊列的名字
15 
16     public static void main(String[] srgs) throws Exception {
17         //獲取連接
18         Connection connection = ConnectionUtil.getConnection();
19         //創建連接
20         Channel channel = connection.createChannel();
21         //聲明隊列
22         //參數1:隊列的名字
23         //參數2:是否持久化隊列,我們的隊列存在記憶體中,如果mq重啟則丟失。如果為ture,則保存在erlang的資料庫中,重啟,依舊保存
24         //參數3:是否排外,我們連接關閉後是否自動刪除隊列,是否私有當前隊列,如果私有,其他隊列不能訪問
25         //參數4:是否自動刪除
26         //參數5:我們傳入的其他參數
27         channel.queueDeclare(QUEUE, false, false, false, null);
28         //發送內容
29         for (int i = 0; i < 100; i++) {
30             channel.basicPublish("", QUEUE, null, ("要發送的消息" + i).getBytes());
31         }
32         //關閉連接
33         channel.close();
34         connection.close();
35     }
36 }
 

定義2個消息消費者

 1 package com.edu.work;
 2 
 3 import com.edu.util.ConnectionUtil;
 4 import com.rabbitmq.client.*;
 5 
 6 import java.io.IOException;
 7 import java.util.Queue;
 8 
 9 /**
10  * @ClassName Recver1
11  * @Deccription 消息接受者
12  * @Author DZ
13  * @Date 2019/5/4 12:58
14  **/
15 public class Recver1 {
16     private final static String QUEUE = "testhellowork";//消息隊列的名稱
17 
18     public static void main(String[] args) throws Exception {
19         Connection connection = ConnectionUtil.getConnection();
20         final Channel channel = connection.createChannel();
21         channel.queueDeclare(QUEUE, false, false, false, null);
22         //channel.basicQos(1);//告訴伺服器,當前消息沒有確認之前,不要發送新消息,合理自動分配資源
23         DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
24             @Override
25             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
26                 //收到消息時候調用
27                 System.out.println("消費者1收到的消息:" + new String(body));
28                 /*super.handleDelivery(consumerTag, envelope, properties, body);*/
29                 //確認消息
30                 //參數2:false為確認收到消息,ture為拒絕收到消息
31                 channel.basicAck(envelope.getDeliveryTag(), false);
32             }
33         };
34         //註冊消費者
35         // 參數2:手動確認,我們收到消息後,需要手動確認,告訴伺服器,我們收到消息了
36         channel.basicConsume(QUEUE, false, defaultConsumer);
37     }
38 }
 1 package com.edu.work;
 2 
 3 import com.edu.util.ConnectionUtil;
 4 

您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • mq系列文章 對mq瞭解不是很多的,可以看一下下麵兩篇文章: 1. "聊聊mq的使用場景" 2. "聊聊業務系統中投遞消息到mq的幾種方式" 3. 聊聊消息消費的幾種方式 4. 如何確保消息至少消費一次 5. 如何保證消息消費的冪等性 本章內容 從消費者的角度出發,分析一下消息消費的兩種方式: 1. ...
  • day21 03 異常處理 1.什麼是異常 異常:程式運行時發生錯誤的信號 錯誤:語法錯誤(一般是不能處理的異常) 邏輯錯誤(可處理的異常) 特點:程式一旦發生錯誤,就從錯誤的位置停下來,不再繼續執行後面的內容 2.怎麼處理異常呢? 比如下麵類型代碼的異常: 如果執行後用戶輸入的不是數據就會報錯: ...
  • 簡介 Hystrix Dashboard是一款針對Hystrix進行實時監控的工具,通過Hystrix Dashboard可以直觀地看到各Hystrix Command的請求響應時間,請求成功率等數據。 快速上手 工程說明 | 工程名 | 埠 | 作用 | | : | : | : : | | eu ...
  • 變數的使用: def test(request): num=1 s='hello' li=[1,2,['a','b']] dic={'name':'w','age':1} se={1,2,3} tup=(1,2,3,4) def my_test(): return '這是my_test' class ...
  • 所屬網站分類: python高級 > 面向對象 作者:阿裡媽媽 鏈接:http://www.pythonheidong.com/blog/article/74/ 來源:python黑洞網 有什麼區別? class Child(SomeBaseClass): def __init__(self): s ...
  • Unsafe是什麼? Unsafe只有CAS的功能嗎? Unsafe為什麼是不安全的? 怎麼使用Unsafe? ...
  • 一、開發環境 1、windows 7 企業版 2、Eclipse IDE for Enterprise Java Developers Version: 2019-03 (4.11.0) 3、JDK 1.8 4、Maven 3.5.2 5、MariaDB 6、Tomcat 8.5 二、基礎配置 1、 ...
  • Python基礎數據類型:字典dict 01 內容大綱 字典的初識 字典的使用(增刪改查) 字典的嵌套 02 具體內容 字典的初識: why: 列表可以存儲大量的數據,數據之間的關聯性不強 ['太白', 18, '男', '大壯', 3, '男'] 列表的查詢速度比較慢。 what:容器型數據類型: ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...