RabbitMQ 原文譯04--發佈和訂閱

来源:http://www.cnblogs.com/grayguo/archive/2016/04/06/5356070.html
-Advertisement-
Play Games

發佈/訂閱 在之前的案例中我們創建了一個工作隊列,這個工作隊列的實現思想就是一個把每一個任務平均分配給每一個執行者,在這個篇文章我們會做一些不一樣的東西,把一個消息發送給多個消費者,這種模式就被稱作"發佈/訂閱". 為了說明這個模式,我們將要創建一個簡單的日誌系統,一個負責發佈消息,另外一個負責接收 ...


發佈/訂閱

在之前的案例中我們創建了一個工作隊列,這個工作隊列的實現思想就是一個把每一個任務平均分配給每一個執行者,在這個篇文章我們會做一些不一樣的東西,把一個消息發送給多個消費者,這種模式就被稱作"發佈/訂閱".

為了說明這個模式,我們將要創建一個簡單的日誌系統,一個負責發佈消息,另外一個負責接收列印他們.

在我們的日誌系統中,每一個運行中的接收者副本將都會獲得消息,這種方式可以讓我們在運行一個接收者直接把消息保存在磁碟的同時,另外一個消費者可以把消息列印到屏幕上.

本質上,發佈一個日誌消息將會廣播給所有的接收者

交換機(Exchanges)

在之前的文章中,我們接受和發送消息都是通過一個隊列來完成了,現在是時候引入RabbitMQ的全部工作模型了.

讓我們快速回憶一下之前涉及到的模型

--生產者(發佈者),是一個負責發送消息的用戶應用程式.

--隊列,負責存儲消息

--消費者(接收者),負責接收消息的用戶程式.

RabbitMQ的核心思想是生產者永遠不會直接把消息發送給隊列,事實上生產者甚至經常不知道一個發出去的消息是否可以有隊列去接收它.

相應的,生產者只能消息發送給交換機,交換機的工作機制非常簡單,一方面它從生產者那裡接收到消息,另一方面它會把消息發送給相應的隊列上.交換機必須要知道怎麼處理接收到的消息,它應該被放入一個特殊的隊列嗎?它是否應該被放入多個隊列?或者它是否需要被忽略.

處理這工作的方式是通過交換機類型來實現的.

這裡有幾個可用的交換機類型:direct,topic,headers,fanout 我們將會關註最後一個(fanout),讓我們創建一個fanout的交換機,名字叫做'logs'

channel.ExchangeDeclare("logs", "fanout");

這個fanout的交換機功能非常簡單(你也許已經從名字中猜到了他的方式),把接收到的消息廣播給所有已知的隊列,這個這是我們的日誌系統需要的.

列出RabbitMQ已添加的交換機:

cmd:rabbitmqctl list_exchanges

 

無命名的交換機:在之前的案例中我們對於交換機一無所知,但是仍然可以把消息發送到隊列上,這是因為我們使用的是一個預設的交互機,名字為空(""),回顧一下我們之前發送消息的方式

var message = GetMessage(args);
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "",routingKey: "hello", basicProperties: null,body: body);

第一個參數就是交換機的名稱,空字元串表示預設的無命名的交換機:消息通過存在的RoutingKey被髮送到隊列上.

現在我們發送命名的交換機代替:

var message = GetMessage(args);
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "logs", routingKey: "", basicProperties: null,  body: body);

臨時隊列

在之前的案例中,我們使用的隊列是一個指定了名字的隊列(記得hello 和task_queue 嗎),給一個隊名命名是嚴格的,我們需要執行者連接的同樣的隊列來工作,當你想在生產者和消費者之間共用隊列的時候指定一個隊列名是非常重要的.但是我們的日誌系統則不在此列,

我們想要監聽到所有的日誌消息,而不僅僅是他們的子集,我們也僅僅對當前正在流轉的消息感興趣,而不是老的消息,結局這個問題我們需要2件事情.

首先,無論何時我們連接到隊列,我們都需要一個新鮮的,空的隊列,為了實現這個目標我們可以每次創建一個隨機名稱的隊列,或者更加便捷的方式--讓服務為我們的隊列隨機命名.

第二,一旦我們斷開到消費者到隊列的連接,我們需要自動刪除隊列.

在.Net客戶端,我們使用無參的queueDeclare()方法來創建一個隨機命名的非持久的,自動刪除的排他隊列.

var queueName = channel.QueueDeclare().QueueName;

queueName就是一個隨機的隊列名,如:amq.gen-JzTY20BRgKO-HjmUJj0wLg.

綁定

我們已經創建了一個fanout的交換機和一個隊列,現在我們需要告訴我們交換機發送消息到我們的隊列,交換機和隊列之間的關係叫做綁定.

channel.QueueBind(queue: queueName,exchange: "logs", routingKey: "");

從現在開始logs 交換機將會把消息放入我們的隊列當中.

列出隊列cmd: rabbitmqctl list_bindings

彙總

負責發送消息的生產者可之前案例基本上是一樣的,最大的不同是我們將消息發送到了我們的命名隊列logs上而不是預設的隊列上,發送的時候我們需要使用routingKey,但是它的值是被fanout交換機忽略的.

EmitLog.cs

class EmitLog
{
    public static void Main(string[] args)
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using(var connection = factory.CreateConnection())
        using(var channel = connection.CreateModel())
        {
            channel.ExchangeDeclare(exchange: "logs", type: "fanout");

            var message = GetMessage(args);
            var body = Encoding.UTF8.GetBytes(message);
            channel.BasicPublish(exchange: "logs", routingKey: "", basicProperties: null, body: body);
            Console.WriteLine(" [x] Sent {0}", message);
        }
        Console.WriteLine(" Press [enter] to exit.");
        Console.ReadLine();
    }

    private static string GetMessage(string[] args)
    {
        return ((args.Length > 0)
               ? string.Join(" ", args)
               : "info: Hello World!");
    }
}

正如你看到的,我們在建立連接之後創建了一個隊列,這一步是必須的,因為發送到一個不存在的交換機是不被允許的。

當隊列還沒有綁定到交換機是發送的消息將會丟失,但是這對我們日誌系統來說沒有問題,當沒有消費者監聽時我們可以安全的忽略這個消息。

ReceiveLogs.cs:

class ReceiveLogs
{
    public static void Main()
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using(var connection = factory.CreateConnection())
        using(var channel = connection.CreateModel())
        {
            channel.ExchangeDeclare(exchange: "logs", type: "fanout");

            var queueName = channel.QueueDeclare().QueueName;
            channel.QueueBind(queue: queueName,exchange: "logs",routingKey: "");

            Console.WriteLine(" [*] Waiting for logs.");

            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, ea) =>
            {
                var body = ea.Body;
                var message = Encoding.UTF8.GetString(body);
                Console.WriteLine(" [x] {0}", message);
            };
            channel.BasicConsume(queue: queueName,
                                 noAck: true,
                                 consumer: consumer);

            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }
    }
}

同時運行兩個receive,可以看到兩個接收端可以同時接收到一個消息。


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 1、通過JavaScript獲取本機的時間,自動更新 <script> function displayTime() { var date = new Date(); //日期對象 var now = ""; now = date.getFullYear() + "年"; now = now + ( ...
  • 演算法實例 排序演算法Sort 歸併排序MergeSort 演算法說明 1. 歸併的思路是任意兩個元素可以比較大小,那麼任意兩個有序的元素集合也可以通過比較大小的方式歸併成一個有序的元素集合 2. 任何的無序元素集合可以拆分的最小的可比較單位是元素本身,例如List list = new List() { ...
  • 最近在幫朋友做一個簡單管理系統,因為筆者夠懶,但是使用過的NHibernate用來做這中項目又太不實際了,索性百度了微型ORM,FluentData是第一個跳入我眼睛的詞。簡單的瞭解下FluentData使用,主要看看是不是符合筆者的要求。嗯……嗯……反應還不錯,API使用也很簡單,參數化查詢、分頁 ...
  • 前言 HtmlHelper方法為我們提供很多html標簽,只需在頁面調用就行了,但是微軟並沒有把所有的html標簽都對應有了擴展方法,需要我們重新自定義Htmlper,來滿足我們需要。 方法 上面代碼解讀: 在使用TagBuilder需要引入命名空間System.Web.Mvc。 Submit方法名 ...
  • 最近整理了一份jQueryUI文檔,方便以後學習和運用。 把玩地址 ...
  • Web service是一個基於可編程的web的應用程式,用於開發分散式的互操作的應用程式,也是一種web服務 WebService的特性有以下幾點: 1.使用XML(標準通用標記語言)來作為數據交互的格式。 2.跨平臺性,因為使用XML所以只要本地應用可以連接網路解析XML就可以實現數據交換,比如 ...
  • 我們很高興宣佈2016年 V1 版本發佈了,可免費下載試用。 今年ComponentOne 將聚焦WinForm、WPF、MVC、UWP平臺和核心控制項Flex家族。 本次發佈主要包括UWP平臺;WinForm平臺的FlexPivot控制項和MVC平臺的FlexSheet控制項;FlexChart、Fle ...
  • 本文綜合整理自知乎同名問答帖,題主的補充: 比 如技能有:可以用cmd 命令查詢到電腦的各種狀態, 可以用快捷鍵瞬間轉換視窗頁面的軟體…當然這些都是些簡單的…還有神麽不為人知的高端技能大家可以來露一手麽? 有什麼網站或者論壇可以接觸到這些高端技能麽? 以前我覺得學電腦的室友可以再 cmd里設置電腦 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...