從頭開始搭建一個Spring boot+RabbitMQ環境

来源:http://www.cnblogs.com/ASPNET2008/archive/2017/02/18/6414145.html
-Advertisement-
Play Games

消息隊列在目前分散式系統下具備非常重要的地位,如下的場景是比較適合消息隊列的: 跨系統的調用,非同步性質的調用最佳。 高併發問題,利用隊列串列特點。 訂閱模式,數據被未知數量的消費者訂閱,比如某種數據的變更會影響多個系統的數據,訂單數據就是比較好理解的。 之前有一個場景是商品數據在修改後需要推送到el ...


消息隊列在目前分散式系統下具備非常重要的地位,如下的場景是比較適合消息隊列的:

  • 跨系統的調用,非同步性質的調用最佳。
  • 高併發問題,利用隊列串列特點。
  • 訂閱模式,數據被未知數量的消費者訂閱,比如某種數據的變更會影響多個系統的數據,訂單數據就是比較好理解的。

之前有一個場景是商品數據在修改後需要推送到elasticsearch中,由於修改產品的併發量以及數據量均不大,所以對於消息未做持久化,而且為了快速上線簡化系統,生產者與消費者更是部署在一個應用中,自生產自消費。這篇將從頭搭建RabbitMQ環境,並且將之集成在Spring boot中。

搭建RabbitMQ環境

erlang

由於RabbitMQ是基於erlang開發的,所以要安裝RabbitMQ先必須安裝erlang。

更換軟體源

使用apt-get時預設的軟體源是us.archive.ubuntu.com,這會經常發生安裝問題,比如速度特別慢或者由於下載不了造成不能安裝。

可以更換成國內的數據源cn.archive.ubuntu.com,速度那是不用說的了(這裡感謝我的同事的提醒)。找到下麵這個文件然後進行替換。

/etc/apt/sources.list

:%s/us.archive/cn.archive/g 

在沒有更新軟體源時,我採取的是源碼編譯安裝方法,參考這篇文章。我安裝的是最新19.2版本,安裝過程中還遇到各種問題就不一一記錄了。

http://erlang.org/doc/installation_guide/INSTALL.html

測試erlang安裝是否正確,輸入erl,如果看到如下圖所示就說明安裝成功了。

安裝RabbitMQ

在未更換軟體源之前我也是選擇了源碼編譯安裝方法,安裝的最新的3.6.6,但手動啟動時總是不成功,錯誤信息如下:

版本問題

RabbitMQ 3.6.6+ erlang 19.2 啟動失敗的問題暫時未解決,有誰知道的可以告訴我。

由於啟動不成功,最後在更新成國內軟體源之後,再次通過 apt-get 安裝RabbitMQ,預設的版本是3.5.7,好像也可以選版本,以後再嘗試。可喜的是通過apt-get安裝的RabbitMQ成功的啟動起來了。可以通過如下命令查看RabbitMQ狀態。

./rabbitmqctl stauts
RabbitMQ管理工具

這是自帶的一個web插件,可以用來管理消息隊列,啟動它的方法比較簡單:

rabbitmq-plugins enable rabbitmq_management

然後重啟RabbitMQ即可生效。預設生成了guest用戶,但這個guest用戶只能在RabbitMQ所在主機才能訪問,所以要想遠程訪問就需要重新分配一個用戶,有兩種辦法:

  • 通過網頁,以guest登錄然後在頁面上完成操作。
  • 通過命令,創建用戶,授權也可以。

創建用戶,指定用戶名以及密碼

./rabbitmqctl add_user root root //用戶名密碼都是root

分配角色,administrator是可以操作和guest本地用戶一樣的功能,當登錄上rabbitmq_management之後,裡面的所有功能都可以使用。

rabbitmqctl set_user_tags root administrator

授權,隊列的操作管理許可權。如果不配置,那麼客戶端在連接消息隊列時會出問題。

rabbitmqctl set_permissions -p / root ".*" ".*" ".*"

上面語句我沒有執行成功,後續再研究下是不是寫法問題

Spring boot集成RabbitMQ

我們在rabbitmq_management上面可以正常訪問操作後,就可以放心的寫demo了,這裡採用spring boot。先看簡單看下RabbitMQ的簡易架構圖,容易理解下麵提到的一些組件。

    • 生產者,消息,消費者

 

    • 消息內部:Exchange,Binding,Queues

引用amqp的starter

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

增加配置信息

這裡沒有採用自動配置

mq.rabbit.host=192.168.21.128
mq.rabbit.port=5672
mq.rabbit.virtualHost=/
mq.rabbit.username=root
mq.rabbit.password=root

創建RabbitMQConfig

  • ConnectionFactory,類似於資料庫連接等。
@Bean
public ConnectionFactory connectionFactory() {
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory(this.mqRabbitHost,this.mqRabbitPort);

    connectionFactory.setUsername(this.mqRabbitUserName);
    connectionFactory.setPassword(this.mqRabbitPassword);
    connectionFactory.setVirtualHost(this.mqRabbitVirtualHost);
    connectionFactory.setPublisherConfirms(true);

    return connectionFactory;
}
  • RabbitTemplate,用來發送消息。
@Bean
public RabbitTemplate rabbitTemplate() {
    RabbitTemplate template = new RabbitTemplate(connectionFactory());
    return template;
}
  • DirectExchange
@Bean
public DirectExchange defaultExchange() {
    return new DirectExchange(EXCHANGE_NAME);
}
  • Queue,構建隊列,名稱,是否持久化之類
@Bean
public Queue queue() {
    return new Queue(QUEUE_NAME, true);
}
  • Binding,將DirectExchange與Queue進行綁定
@Bean
public Binding binding() {
    return BindingBuilder.bind(queue()).to(defaultExchange()).with(ROUTING_KEY);
}
  • SimpleMessageListenerContainer,消費者

需要將ACK修改為手動確認,避免消息在處理過程中發生異常造成被誤認為已經成功消費的假象。

@Bean
public SimpleMessageListenerContainer messageContainer() {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());
    container.setQueues(queue());
    container.setExposeListenerChannel(true);
    container.setMaxConcurrentConsumers(1);
    container.setConcurrentConsumers(1);
    container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
    container.setMessageListener(new ChannelAwareMessageListener() {

        public void onMessage(Message message, com.rabbitmq.client.Channel channel) throws Exception {
            byte[] body = message.getBody();
            logger.info("消費端接收到消息 : " + new String(body));
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }
    });
    return container;
}

服務端,業務邏輯,調用消息隊列。

為了讓客戶端知道消息是否已經成功,消息隊列提供了回調機制(需要實現ConfirmCallback),當消息伺服器接收到消息之後會給客戶端一個通知,此時客戶端根據消息應答來決定後續的流程。

@Service
public class ProductServiceImpl extends BaseService implements ProductService, RabbitTemplate.ConfirmCallback {

    @Autowired
    private ProductMapper productMapper;

    private RabbitTemplate rabbitTemplate;

    public ProductServiceImpl(RabbitTemplate rabbitTemplate){
        this.rabbitTemplate=rabbitTemplate;
        this.rabbitTemplate.setConfirmCallback(this);
    }

    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        this.logger.info(" 消息id:" + correlationData);
        if (ack) {
            this.logger.info("消息發送確認成功");
        } else {
            this.logger.info("消息發送確認失敗:" + cause);

        }
    }

    @Override
    public void save(Product product) {

        //執行保存
        String uuid = UUID.randomUUID().toString();
        CorrelationData correlationId = new CorrelationData(uuid);
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, RabbitMQConfig.ROUTING_KEY, product.getName(),correlationId);
    }
}

執行結果

可以清晰的看到RabbitMQ發給生產者的信息收到的確認信息,也能看到消息被消費端消費後的信息。

RabbitMQ的其它方面

高可用方案

與常見的資料庫類似,都是主從模式來保證高可用,可以利用HAProxy來實現主從備份方案。

水平擴展方案

主要是為瞭解決垂直優化的瓶頸問題,主要有這三種:

  • clustering,這是預設內置的一種集群模式,與下麵兩種不同的是clustering一般應用於同一區域網。
  • federation,有待後續學習
  • shovel,有待後續學習

不丟消息特性

這個不是RabbitMQ的專利,將消息持久化可以確保RabbitMQ重啟或者死機過程中不至於丟掉沒有消費的消息。

消息不被重覆消費

這點要靠消費端來完成,儘管消費端可以通過ACK來通知消息隊列消息已經被消費,但如果消費端消費了消息,此時ACK過程中的通知出現異常,消息隊列會認為消息未被消費會繼續發給消費端。

總結

初次安裝可能會出現一堆問題,特別是需要安裝所依賴的眾多包。RabbitMQ與Erlang可能存在版本依賴問題待後續確認。spring boot下集成RabbitMQ異常簡單,可以根據需求部署集群來實現可擴展高可用的消息系統。

引用


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 首先引入下麵這段生產者和消費者的程式,店員類作為生產產品和消費產品的中介,其中的數據product為共用數據,產品最多只能囤積5個,當產品達到5個還在生產時,就會提示“產品已滿!”,類似地,如果產品只有0個了還在消費,會提示“缺貨!”: 運行程式,結果如下: 這是一種不好的情況,因為當產品已滿時,還 ...
  • package com.open1111.jsoup; import org.apache.http.HttpEntity;import org.apache.http.client.methods.CloseableHttpResponse;import org.apache.http.clien ...
  • 在 "java註解生成xml和包含CDATA問題" 裡面做了介紹,這裡直接貼代碼。 1:生成xml的java文件 setEscapeText 設置為false,最為重要 2:生成後的xml,上面的第三種才能生成正確的結果! 文件:在D:/test.xml 3: 參考文章 (1): "Dom4j格式化 ...
  • 百度java生成xml,有一大推的文章,主要的生成方式一種使用Dom4J ,還有一種使用Jdk自帶註解類! 下麵主要整理我註解類的使用,(可以參考這篇文章 "Dom4J生成xml和包含CDATA問題" )和xml中CDATA 問題的解決方法! 1:要生成的xml原始文件! 2:xml對應的model ...
  • 不少新改進。 https://blog.golang.org/go1.8 不過據說有issue:http服務比上個版本慢些。對於普通用戶來說影響應該不大,繼續關註。 創意設計,版權所有 :) ...
  • 使用new初始化對象中的指針成員時遇到的問題 在構造函數中使用new初始化指針成員,那麼析構函數中就必須使delete,並且new對應delete, new[]則對應於delete[]。 在有多個構造函數的情況下,必須以相同的方式使用new,要不用new,要不用new[],因為只存在一個析構函數,所 ...
  • cookie,很多網站都會用的一個機制,可以保存用戶的相關信息,token等等,很多人熟知的應該是第一方cookie,可以針對二級功能變數名稱進行信息的保存,如果遇到跨域的情況,那麼第一方cookie是沒有用的,因為他做不到跨域。但是可以利用第三方cookie來實現這一的機制,第三方cookie不僅可以存儲 ...
  • http://blog.csdn.net/jnqqls/article/details/17723417 通過對EJB系列的總結和學習我們已經對EJB有了基本的瞭解,但是為了更進一步的去深入學習EJB,我們很有必要將它拿出來跟之前非常熟悉的spring進行一下對比,通過對比來瞭解這兩個內容的相同與不 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...