rabbitMQ模式

来源:https://www.cnblogs.com/matd/archive/2019/03/02/10463143.html
-Advertisement-
Play Games

1.hello 1.hello 1.hello 1.hello 消息生產者p將消息放入隊列 消費者監聽隊列,如果隊列中有消息,就消費掉,消息被拿走後,自動從隊列刪除(隱患,消息可能沒有被消費者正確處理,已經消失了,無法恢復) 應用場景:聊天室 案例: 1>.首先準備依賴 <dependency> < ...


1.hello

 消息生產者p將消息放入隊列

消費者監聽隊列,如果隊列中有消息,就消費掉,消息被拿走後,自動從隊列刪除
(隱患,消息可能沒有被消費者正確處理,已經消失了,無法恢復)

應用場景:聊天室 

案例:

1>.首先準備依賴

<dependency>  
     <groupId>org.springframework.boot</groupId>  
     <artifactId>spring-boot-starter-amqp</artifactId>  
</dependency>

 

2>.寫一個test類

 

public class SimpleTest {  
   //模擬生產者將消息放入隊列  
   @Test  
   public void send() throws Exception{  
       /*1 創建連接工廠 
        * 2 配置共創config 
        * 3 獲取連接 
        * 4獲取通道 
        * 5 從通道聲明queue 
        * 6 發送消息 
        * 7 釋放資源 
        */
  
       ConnectionFactory factory=new ConnectionFactory();  
       factory.setHost("106.23.34.56");  
       factory.setPort(5672);  
       factory.setVirtualHost("/tb");  
       factory.setUsername("admin");  
       factory.setPassword("123456");  
       //從工廠獲取連接  
       Connection conn=factory.newConnection();  
       //從連接獲取通道  
       Channel chan=conn.createChannel();  
       //利用channel聲明第一個隊列  
       chan.queueDeclare("simple", false, false, false, null);  
       //queue String類型,表示聲明的queue對列的名字  
       //durable Boolean類型,表示是否持久化  
       //exclusive Boolean類型:當前聲明的queue是否專註;true當前連接創建的  
       //任何channle都可以連接這個queue,false,新的channel不可使用  
       //autoDelete Boolean類型:在最後連接使用完成後,是否刪除隊列,false  
       //arguments Map類型,其他聲明參數  
       //發送消息  
       String msg="helloworld,nihaoa";  
       chan.basicPublish("", "simple", null, msg.getBytes());  
       //exchange String類型,交換機名稱,簡單模式使用預設交換""  
       //routingkey String類型,當前的消息綁定的routingkey,簡單模式下,與隊列同名即可  
       //props BasicProperties類型,消息的屬性欄位對象,例如BasicProperties  
       //可以設置一個deliveryMode的值0 持久化,1 表示不持久化,durable配合使用  
       //body byte[] :消息字元串的byte數組  
   }  
   //模擬消費端  
   @Test  
   public void receive() throws Exception{

 

ConnectionFactory factory=new ConnectionFactory();  
factory.setHost("106.23.34.56");  
factory.setPort(5672);  
factory.setVirtualHost("/tb");  
factory.setUsername("admin");  
factory.setPassword("123456");  
//從工廠獲取連接

Connection conn=factory.newConnection();//從連接獲取通道Channel chan=conn.createChannel();chan.queueDeclare("simple", false, false, false, null);//創建一個消費者QueueingConsumer consumer= new QueueingConsumer(chan);chan.basicConsume("simple", consumer);//監聽隊列while(true){//獲取下一個delivery,delivery從隊列獲取消息Delivery delivery = consumer.nextDelivery();String msg=new String(delivery.getBody());System.out.println(msg);}}}

2.work模式

生產者將消息放入隊列
多個消費者同時監聽同一個隊列,消息如何被消費?
C1,C2共同爭搶當前消息隊列的內容,誰先拿到消息,誰來負責消費
應用場景:紅包;大型項目中的資源調度過程(直接由最空閑的系統爭搶到資源處理任務) 

案例:

1>首先寫一個工具類

public class ConnectionUtil {
 
 public static Connection getConn(){
   try{
     ConnectionFactory factory=new ConnectionFactory();
     factory.setHost("106.33.44.179");
     factory.setPort(5672);
     factory.setVirtualHost("/tb");
     factory.setUsername("admin");
     factory.setPassword("123456");
   
     //從工廠獲取連接
     Connection conn=factory.newConnection();
     return conn;
   }catch(Exception e){
     System.out.println(e.getMessage());
     return null;
   }
   
 }
}

 

2>寫test類

public class WorkTest {
 @Test
 public void send() throws Exception{
   //獲取連接
   Connection conn = ConnectionUtil.getConn();
   Channel chan = conn.createChannel();
   //聲明隊列
   chan.queueDeclare("work", false, false, false, null);
   for(int i=0;i<100;i++){
     String msg="1712,hello:"+i+"message";
     chan.basicPublish("", "work", null, msg.getBytes());
     System.out.println("第"+i+"條信息已經發送");
   }
   chan.close();
   conn.close();
 }
 @Test
 public void receive1() throws Exception{
   //獲取連接,獲取通道
   Connection conn = ConnectionUtil.getConn();
   Channel chan = conn.createChannel();
   chan.queueDeclare("work", false, false, false, null);
   //同一時刻伺服器只發送一條消息給同一消費者,消費者空閑,才發送一條
   chan.basicQos(1);
   //定義消費者
   QueueingConsumer consumer=new QueueingConsumer(chan);
   //綁定隊列和消費者的關係
   //queue
   //autoAck:消息被消費後,是否自動確認回執,如果false,不自動需要手動在
   //完成消息消費後進行回執確認,channel.ack,channel.nack
   //callback
   //chan.basicConsume(queue, autoAck, callback)
   chan.basicConsume("work", false, consumer);
   //監聽
   while(true){
     Delivery delivery=consumer.nextDelivery();
     byte[] result = delivery.getBody();
     String msg=new String(result);
     System.out.println("接受到:"+msg);
     Thread.sleep(50);
     //返回伺服器,回執
     chan.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
   }  
 }
 @Test
 public void receive2() throws Exception{
   //獲取連接,獲取通道
   Connection conn = ConnectionUtil.getConn();
   Channel chan = conn.createChannel();
   chan.queueDeclare("work", false, false, false, null);
   //同一時刻伺服器只發送一條消息給同一消費者,消費者空閑,才發送一條
   chan.basicQos(1);
   //定義消費者
   QueueingConsumer consumer=new QueueingConsumer(chan);
   //綁定隊列和消費者的關係
   //queue
   //autoAck:消息被消費後,是否自動確認回執,如果false,不自動需要手動在
   //完成消息消費後進行回執確認,channel.ack,channel.nack
   //callback
   //chan.basicConsume(queue, autoAck, callback)
   chan.basicConsume("work", false, consumer);
   //監聽
   while(true){
     Delivery delivery=consumer.nextDelivery();
     byte[] result = delivery.getBody();
     String msg=new String(result);
     System.out.println("接受到:"+msg);
     Thread.sleep(150);
     //返回伺服器,回執
     chan.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
   }
 }
 
}

3 publish/fanout發佈訂閱


生產者將消息交給交換機
有交換機根據發佈訂閱的模式設定將消息同步到所有的綁定隊列中;
後端的消費者都能拿到消息

應用場景:郵件群發,群聊天,廣告

案例:

public class FanoutTest {
 //交換機,有類型,發佈訂閱:fanout
 //路由模式:direct
 //主題模式:topic
 @Test
 public void send() throws Exception {
   //獲取連接
   Connection conn = ConnectionUtil.getConn();
   Channel chan = conn.createChannel();
   //聲明交換機
   //參數意義,1 交換機名稱,2 類型:fanout,direct,topic
   chan.exchangeDeclare("fanoutEx", "fanout");
   //發送消息
   for(int i=0;i<100;i++){
     String msg="1712 hello:"+i+"msg";
     chan.basicPublish("fanoutEx", "", null, msg.getBytes());
     System.out.println("第"+i+"條信息已經發送");
   }
 }
 
 @Test
 public void receiv01() throws Exception{
   //獲取連接
   Connection conn = ConnectionUtil.getConn();
   Channel chan = conn.createChannel();
   //生命隊列
   chan.queueDeclare("fanout01", false, false, false, null);
   //聲明交換機
   chan.exchangeDeclare("fanoutEx", "fanout");
   //綁定隊列到交換機
   //參數 1 隊列名稱,2 交換機名稱 3 路由key
   chan.queueBind("fanout01", "fanoutEx", "");
   chan.basicQos(1);
   //定義消費者
   QueueingConsumer consumer=new QueueingConsumer(chan);
   //消費者與隊列綁定
   chan.basicConsume("fanout01",false, consumer);
   while(true){
     Delivery delivery= consumer.nextDelivery();
     System.out.println("一號消費者接收到"+
     new String(delivery.getBody()));
     chan.basicAck(delivery.getEnvelope().
         getDeliveryTag(), false);
   }
 }
 @Test
 public void receiv02() throws Exception{
   //獲取連接
   Connection conn = ConnectionUtil.getConn();
   Channel chan = conn.createChannel();
   //生命隊列
   chan.queueDeclare("fanout02", false, false, false, null);
   //聲明交換機
   chan.exchangeDeclare("fanoutEx", "fanout");
   //綁定隊列到交換機
   //參數 1 隊列名稱,2 交換機名稱 3 路由key
   chan.queueBind("fanout02", "fanoutEx", "");
   chan.basicQos(1);
   //定義消費者
   QueueingConsumer consumer=new QueueingConsumer(chan);
   //消費者與隊列綁定
   chan.basicConsume("fanout02",false, consumer);
   while(true){
     Delivery delivery= consumer.nextDelivery();
     System.out.println("二號消費者接收到"+new String(delivery.getBody()));
     chan.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
   }
 }
}

4 routing路由模式

 

生產者發送消息到交換機,同時綁定一個路由Key,交換機根據路由key對下游綁定的隊列進行路
由key的判斷,滿足路由key的隊列才會接收到消息,消費者消費消息

應用場景: 項目中的error報錯

案例:

public class RoutingTopicTest {
 
 @Test
 public void routingSend() throws Exception{
   //獲取連接
   Connection conn = ConnectionUtil.getConn();
   Channel chan = conn.createChannel();
   //聲明交換機
   //參數意義,1 交換機名稱,2 類型:fanout,direct,topic
   chan.exchangeDeclare("directEx", "direct");
   //發送消息
   String msg="路由模式的消息";
   chan.basicPublish("directEx", "jt1713", 
       null, msg.getBytes());
 }
 @Test
 public void routingRec01() throws Exception{
   System.out.println("一號消費者等待接收消息");
   //獲取連接
   Connection conn = ConnectionUtil.getConn();
   Channel chan = conn.createChannel();
   //聲明隊列
   chan.queueDeclare("direct01", false, false, false, null);
   //聲明交換機
   chan.exchangeDeclare("directEx", "direct");
   //綁定隊列到交換機
   //參數 1 隊列名稱,2 交換機名稱 3 路由key
   chan.queueBind("direct01", "directEx", "jt1712");
   chan.basicQos(1);
   //定義消費者
   QueueingConsumer consumer=new QueueingConsumer(chan);
   //消費者與隊列綁定
   chan.basicConsume("direct01",false, consumer);
   while(true){
     Delivery delivery= consumer.nextDelivery();
     System.out.println("一號消費者接收到"+
     new String(delivery.getBody()));
     chan.basicAck(delivery.getEnvelope().
         getDeliveryTag(), false);
   }
 }
 @Test
 public void routingRec02() throws Exception{
   System.out.println("二號消費者等待接收消息");
   //獲取連接
   Connection conn = ConnectionUtil.getConn();
   Channel chan = conn.createChannel();
   //聲明隊列
   chan.queueDeclare("direct02", false, false, false, null);
   //聲明交換機
   chan.exchangeDeclare("directEx", "direct");
   //綁定隊列到交換機
   //參數 1 隊列名稱,2 交換機名稱 3 路由key
   chan.queueBind("direct02", "directEx", "jt1711");
   chan.basicQos(1);
   //定義消費者
   QueueingConsumer consumer=new QueueingConsumer(chan);
   //消費者與隊列綁定
   chan.basicConsume("direct02",false, consumer);
   while(true){
     Delivery delivery= consumer.nextDelivery();
     System.out.println("二號消費者接收到"+
     new String(delivery.getBody()));
     chan.basicAck(delivery.getEnvelope().
         getDeliveryTag(), false);
   }
 }
}

5 topic主題模式

*號代表單個詞語
#代表多個詞語

其他的內容與routing路由模式一致

案例:

public class RoutingTopicTest {
 
 
 @Test
 public void routingRec02() throws Exception{
   System.out.println("二號消費者等待接收消息");
   //獲取連接
   Connection conn = ConnectionUtil.getConn();
   Channel chan = conn.createChannel();
   //聲明隊列
   chan.queueDeclare("direct02", false, false, false, null);
   //聲明交換機
   chan.exchangeDeclare("directEx", "direct");
   //綁定隊列到交換機
   //參數 1 隊列名稱,2 交換機名稱 3 路由key
   chan.queueBind("direct02", "directEx", "jt1711");
   chan.basicQos(1);
   //定義消費者
   QueueingConsumer consumer=new QueueingConsumer(chan);
   //消費者與隊列綁定
   chan.basicConsume("direct02",false, consumer);
   while(true){
     Delivery delivery= consumer.nextDelivery();
     System.out.println("二號消費者接收到"+
     new String(delivery.getBody()));
     chan.basicAck(delivery.getEnvelope().
         getDeliveryTag(), false);
   }
 }
 
 @Test
 public void topicSend() throws Exception{
   //獲取連接
   Connection conn = ConnectionUtil.getConn();
   Channel chan = conn.createChannel();
   //聲明交換機
   //參數意義,1 交換機名稱,2 類型:fanout,direct,topic
   chan.exchangeDeclare("topicEx", "topic");
   //發送消息
   String msg="主題模式的消息";
   chan.basicPublish("topicEx", "jt1712.add.update", 
       null, msg.getBytes());
 }
 @Test
 public void topicRec01() throws Exception{
   System.out.println("一號消費者等待接收消息");
   //獲取連接
   Connection conn = ConnectionUtil.getConn();
   Channel chan = conn.createChannel();
   //聲明隊列
   chan.queueDeclare("topic01", false, false, false, null);
   //聲明交換機
   chan.exchangeDeclare("topicEx", "topic");
   //綁定隊列到交換機
   //參數 1 隊列名稱,2 交換機名稱 3 路由key
   chan.queueBind("topic01", "topicEx", "jt1712");
   chan.basicQos(1);
   //定義消費者
   QueueingConsumer consumer=new QueueingConsumer(chan);
   //消費者與隊列綁定
   chan.basicConsume("topic01",false, consumer);
   while(true){
     Delivery delivery= consumer.nextDelivery();
     System.out.println("一號消費者接收到"+
     new String(delivery.getBody()));
     chan.basicAck(delivery.getEnvelope().
         getDeliveryTag(), false);
   }
 }
 @Test
 public void topicRec02() throws Exception{
   System.out.println("二號消費者等待接收消息");
   //獲取連接
   Connection conn = ConnectionUtil.getConn();
   Channel chan = conn.createChannel();
   //聲明隊列
   chan.queueDeclare("topic02", false, false, false, null);
   //聲明交換機
   chan.exchangeDeclare("topicEx", "topic");
   //綁定隊列到交換機
   //參數 1 隊列名稱,2 交換機名稱 3 路由key
   chan.queueBind("topic02", "topicEx", "jt1712.#");
   chan.basicQos(1);
   //定義消費者
   QueueingConsumer consumer=new QueueingConsumer(chan);
   //消費者與隊列綁定
   chan.basicConsume("topic02",false, consumer);
   while(true){
     Delivery delivery= consumer.nextDelivery();
     System.out.println("二號消費者接收到"+
     new String(delivery.getBody()));
     chan.basicAck(delivery.getEnvelope().
         getDeliveryTag(), false);
   }
 }
}

您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 背景 經測試,android手機中沒有這個問題, iphone手機中的Safari瀏覽器會出現這個問題。 例如: 解決辦法: 給鏈接加上 target="_parent", 如果iframe的嵌套比較深可以用 target="_top" ...
  • 有時直接打開本地html文件會使一些web操作無法進行,需要運行一個本地伺服器。 使用nodejs的 可以迅速地啟動一個本地靜態資源伺服器 ...
  • 前言 本次做後臺管理系統,採用的是 AntD 框架。涉及到圖片的上傳,用的是AntD的 "upload" 組件。 前端做文件上傳這個功能,是很有技術難度的。既然框架給我們提供好了,那就直接用唄。結果用的時候,發現 upload 組件的很多bug。下麵來列舉幾個。 備註:本文寫於2019 03 02, ...
  • 背景 公司為提高客服部門溝通效率對接電話呼叫中心,調研後選擇了亞馬遜的Amazon Connect服務,因為是國外業務沒有選擇用阿裡雲,怕有坑。 Amazon Connect後臺 需要在後臺創建“聯繫流”,也就是用戶接通電話後我們提供的一系列功能選項,比如開始放一段歡迎語音,然後提示用戶選擇1,2, ...
  • 今天我們來詳解一下git的各種命令,此為git的第一篇,後續還會有好幾篇,希望大家看了能有所進步 Git Commit Git 倉庫中的提交記錄保存的是你的目錄下所有文件的快照,就像是把整個目錄複製,然後再粘貼一樣,但比複製粘貼優雅許多! Git 希望提交記錄儘可能地輕量,因此在你每次進行提交時,它 ...
  • 在JavaScript中,使用var創建變數,會創建全局變數或局部變數。 只有在非函數內創建的變數,才是全局變數,該變數可以在任何地方被讀取。 而在函數內創建變數時,只有在函數內部才可讀取。在函數外部時,調用函數也無法讀取局部變數。 function test(){ var g = 5; } // ...
  • 計算屬性 1.1 什麼是計算屬性: 插值表達式常用於簡單的運算,當其過長或邏輯複雜時,會難以維護,這時應該使用計算屬性。 插值表達式里的值是JS表達式 所有的計算屬性都以函數的形式寫在Vue實例內的computed選項內,最終返回計算後的結果。 1.2 計算屬性的用法 在一個計算屬性里可以完成各種復 ...
  • MVC設計模式:modle層,view層,controller層 以前學習的servlet其實就是一個java類,或者說經過規範的java類,實際進行跳轉時,還是要在web.xml文件中配置才能正常跳轉。 controller層可以放servlet,在SpringMVC中則可以創建java類通過@c ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...