架構設計之NodeJS操作消息隊列RabbitMQ

来源:https://www.cnblogs.com/wukong-holmes/archive/2018/07/13/9306733.html
-Advertisement-
Play Games

一. 什麼是消息隊列? 消息(Message)是指在應用間傳送的數據。消息可以非常簡單,比如只包含文本字元串,也可以更複雜,可能包含嵌入對象。 消息隊列(Message Queue)是一種應用間的通信方式,消息發送後可以立即返回,由消息系統來確保消息的可靠傳遞。消息發佈者只管把消息發佈到 MQ 中而 ...


一. 什麼是消息隊列?

消息(Message)是指在應用間傳送的數據。消息可以非常簡單,比如只包含文本字元串,也可以更複雜,可能包含嵌入對象。

消息隊列(Message Queue)是一種應用間的通信方式,消息發送後可以立即返回,由消息系統來確保消息的可靠傳遞。消息發佈者只管把消息發佈到 MQ 中而不用管誰來取,消息使用者只管從 MQ 中取消息而不管是誰發佈的。這樣發佈者和使用者都不用知道對方的存在。

二. 常用的消息隊列有哪些?

RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMq。

甚至現在部分NoSQL也可做消息隊列,如Redis。

三. 消息隊列的使用場景?

  • 非同步處理

  • 應用解耦

  • 流量削峰

四. 使用案例

上規模的公司都會有自己的日誌分析系統,日誌系統是怎麼實現的呢?

 

圖解:用戶在訪問應用的時候,我們要記錄下用戶的操作記錄和系統的異常日誌,常規的做法是將系統產生的日誌保存到伺服器磁碟,在伺服器中開啟定時任務,定時將磁碟的日誌信息傳入mq中(生產者),也定時將mq中的消息取出並存到相應的資料庫,如ElasticSearch或Hive中。

五. 如何安裝RabbitMQ?

上面的案例介紹了MQ的一個使用場景,我這裡是用RabbitMQ舉例,現實項目中可能用到的是Kafka。

  1. 首先安裝brew(mac為例)

    /usr/bin/ruby -e "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/install)" 
  2. 安裝RabbitMQ

    brew install rabbitmq
  3. 運行RabbitMQ

    進入到 /usr/local/Cellar/rabbitmq/3.7.7,執行

    sbin/rabbitmq-server
  4. 啟動插件

    進入到 /usr/local/Cellar/rabbitmq/3.7.7/sbin

    ./rabbitmq-plugins enable rabbitmq_management
  5. 登陸管理界面

    打開瀏覽器輸入:http://localhost:15672,RabbitMQ預設15672埠六. Nodejs操作RabbitMQ

     

 

網上可以找到好幾個相應的Node SDK,這裡推薦amqplib

1. 生產者

/**
 * 對RabbitMQ的封裝
 */
let amqp = require('amqplib');

class RabbitMQ {
    constructor() {
        this.hosts = [];
        this.index = 0;
        this.length = this.hosts.length;
        this.open = amqp.connect(this.hosts[this.index]);
    }
    sendQueueMsg(queueName, msg, errCallBack) {
        let self = this;

        self.open
            .then(function (conn) {
                return conn.createChannel();
            })
            .then(function (channel) {
                return channel.assertQueue(queueName).then(function (ok) {
                    return channel.sendToQueue(queueName, new Buffer(msg), {
                        persistent: true
                    });
                })
                    .then(function (data) {
                        if (data) {
                            errCallBack && errCallBack("success");
                            channel.close();
                        }
                    })
                    .catch(function () {
                        setTimeout(() => {
                            if (channel) {
                                channel.close();
                            }
                        }, 500)
                    });
            })
            .catch(function () {
                let num = self.index++;

                if (num <= self.length - 1) {
                    self.open = amqp.connect(self.hosts[num]);
                } else {
                    self.index == 0;
                }
            });
    }
}

 

2. 消費者

/**
 * 對RabbitMQ的封裝
 */
let amqp = require('amqplib');

class RabbitMQ {
    constructor() {
        this.open = amqp.connect(this.hosts[this.index]);
    }
    receiveQueueMsg(queueName, receiveCallBack, errCallBack) {
        let self = this;

        self.open
            .then(function (conn) {
                return conn.createChannel();
            })
            .then(function (channel) {
                return channel.assertQueue(queueName)
                    .then(function (ok) {
                        return channel.consume(queueName, function (msg) {
                            if (msg !== null) {
                                let data = msg.content.toString();
                                channel.ack(msg);
                                receiveCallBack && receiveCallBack(data);
                            }
                        })
                            .finally(function () {
                                setTimeout(() => {
                                    if (channel) {
                                        channel.close();
                                    }
                                }, 500)
                            });
                    })
            })
            .catch(function () {
                let num = self.index++;
                if (num <= self.length - 1) {
                    self.open = amqp.connect(self.hosts[num]);
                } else {
                    self.index = 0;
                    self.open = amqp.connect(self.hosts[0]);
                }
            });
    }

3. 通過生產者向MQ發送一個消息,並創建隊列

let mq = new RabbitMQ();
mq.sendQueueMsg('testQueue', 'my first message', (error) => {
    console.log(error)
})

執行之後,我們打開管理平臺,發現RabbbitMQ已經接受到了一條消息:

並且RabbbitMQ新增了一個隊列testQueue

4. 獲取指定隊列的消息

let mq = new RabbitMQ();
mq.receiveQueueMsg('testQueue',(msg) => 
{    
   console.log(msg)
})
// 輸出結果:my first message複製代碼

此時打開RabbitMQ管理平臺,消息數量已經變為0

綜上:我們簡單講述了消息隊列及RabbitMQ相關的一些知識,以及我們如何通過nodejs來生產與消費消息,上面講的比較簡單,之後會發表更多文章講述消息隊列集群搭建及容災的實現。


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 效果如下: ...
  • 今天在學習angularjs的分頁插件時遇到了一個前端的問題,谷歌瀏覽器開發者模式調試的時候發現每次點擊分頁刷新按鈕會觸發兩次後臺請求,ajax向後臺發送了兩次請求,這對於強迫症患者來說是一個比較噁心和感到不舒服的事情。 於是在網上也找到了靠譜的解決方案:http://jqvue.com/tm.pa ...
  • 在IT界已經混了5年了,5年中瀏覽了不少的網站,在上面查詢自己想要的東西,解決工作中遇到的問題,心裡總想有天自己能夠有自己的博客,能給分享一些自己在生活中、工作中遇到的問題,讓其他有類似經歷的朋友能夠少走彎路,今天終於鼓起勇氣在博客園寫下第一篇隨筆。其他不做過多的介紹,下麵將介紹今天在工作中遇到的一 ...
  • 全局安裝是把包安裝在Node安裝目錄下的node_modules文件夾中,一般在 \Users\用戶名\AppData\Roaming\ 目錄下,可以使用npm root -g查看全局安裝目錄 本地(局部)安裝是把包安裝在指定項目(要在指定的根目錄下輸入命令)的node_modules文件夾下(若沒 ...
  • 主要用於調試,顯示信息,重點看例子在瀏覽器 JavaScript 中,通常 window 是全局對象, Node.js 中的全局對象是 global####__filename__filename 表示當前正在執行的腳本的文件名。它將輸出文件所在位置的絕對路徑,且和命令行參數所指定的文件名不一定相同 ...
  • 第一階段: C/S(client server) B/S(browser server) 網頁製作 + 技術棧: PhotoShop、HTML、CSS 第二階段: 從靜態到動態,從後端到前端 前端開發工程師 前後端分離 + 後臺: 完成數據的分析和業務邏輯編寫(包含API介面編寫) + 前端: 網頁 ...
  • 先上源碼,版本是ES6 13行常規(700bytes) shortest snake game.html 壓縮後的500bytes(當然兩處document還是可以用eval壓縮的) index.500bytes.html 之前很火的20行代碼地址(有BUG)(900bytes) hj7jay/ar ...
  • 冒泡的概念就是 當子元素觸發事件的時候 相應的祖宗十八代素也會觸發相同的事件(前提父元素也添加了一樣的事件)eg:兒子 有一個onclick 祖宗十八代 也有onclick 當點擊兒子的時候 祖宗十八代的點擊事件也會被觸發 有時候這種情況會導致很多問題 所以要阻止冒泡 只有被點擊的元素才觸發事件 不... ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...