消息隊列在目前分散式系統下具備非常重要的地位,如下的場景是比較適合消息隊列的: 跨系統的調用,非同步性質的調用最佳。 高併發問題,利用隊列串列特點。 訂閱模式,數據被未知數量的消費者訂閱,比如某種數據的變更會影響多個系統的數據,訂單數據就是比較好理解的。 之前有一個場景是商品數據在修改後需要推送到el ...
消息隊列在目前分散式系統下具備非常重要的地位,如下的場景是比較適合消息隊列的:
- 跨系統的調用,非同步性質的調用最佳。
- 高併發問題,利用隊列串列特點。
- 訂閱模式,數據被未知數量的消費者訂閱,比如某種數據的變更會影響多個系統的數據,訂單數據就是比較好理解的。
之前有一個場景是商品數據在修改後需要推送到elasticsearch中,由於修改產品的併發量以及數據量均不大,所以對於消息未做持久化,而且為了快速上線簡化系統,生產者與消費者更是部署在一個應用中,自生產自消費。這篇將從頭搭建RabbitMQ環境,並且將之集成在Spring boot中。
搭建RabbitMQ環境
erlang
由於RabbitMQ是基於erlang開發的,所以要安裝RabbitMQ先必須安裝erlang。
更換軟體源
使用apt-get時預設的軟體源是us.archive.ubuntu.com,這會經常發生安裝問題,比如速度特別慢或者由於下載不了造成不能安裝。
可以更換成國內的數據源cn.archive.ubuntu.com,速度那是不用說的了(這裡感謝我的同事的提醒)。找到下麵這個文件然後進行替換。
/etc/apt/sources.list :%s/us.archive/cn.archive/g
在沒有更新軟體源時,我採取的是源碼編譯安裝方法,參考這篇文章。我安裝的是最新19.2版本,安裝過程中還遇到各種問題就不一一記錄了。
測試erlang安裝是否正確,輸入erl,如果看到如下圖所示就說明安裝成功了。
安裝RabbitMQ
在未更換軟體源之前我也是選擇了源碼編譯安裝方法,安裝的最新的3.6.6,但手動啟動時總是不成功,錯誤信息如下:
版本問題
RabbitMQ 3.6.6+ erlang 19.2 啟動失敗的問題暫時未解決,有誰知道的可以告訴我。
由於啟動不成功,最後在更新成國內軟體源之後,再次通過 apt-get 安裝RabbitMQ,預設的版本是3.5.7,好像也可以選版本,以後再嘗試。可喜的是通過apt-get安裝的RabbitMQ成功的啟動起來了。可以通過如下命令查看RabbitMQ狀態。
./rabbitmqctl stauts
RabbitMQ管理工具
這是自帶的一個web插件,可以用來管理消息隊列,啟動它的方法比較簡單:
rabbitmq-plugins enable rabbitmq_management
然後重啟RabbitMQ即可生效。預設生成了guest用戶,但這個guest用戶只能在RabbitMQ所在主機才能訪問,所以要想遠程訪問就需要重新分配一個用戶,有兩種辦法:
- 通過網頁,以guest登錄然後在頁面上完成操作。
- 通過命令,創建用戶,授權也可以。
創建用戶,指定用戶名以及密碼
./rabbitmqctl add_user root root //用戶名密碼都是root
分配角色,administrator是可以操作和guest本地用戶一樣的功能,當登錄上rabbitmq_management之後,裡面的所有功能都可以使用。
rabbitmqctl set_user_tags root administrator
授權,隊列的操作管理許可權。如果不配置,那麼客戶端在連接消息隊列時會出問題。
rabbitmqctl set_permissions -p / root ".*" ".*" ".*"
上面語句我沒有執行成功,後續再研究下是不是寫法問題
Spring boot集成RabbitMQ
我們在rabbitmq_management上面可以正常訪問操作後,就可以放心的寫demo了,這裡採用spring boot。先看簡單看下RabbitMQ的簡易架構圖,容易理解下麵提到的一些組件。
-
生產者,消息,消費者
-
消息內部:Exchange,Binding,Queues
引用amqp的starter
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
增加配置信息
這裡沒有採用自動配置
mq.rabbit.host=192.168.21.128 mq.rabbit.port=5672 mq.rabbit.virtualHost=/ mq.rabbit.username=root mq.rabbit.password=root
創建RabbitMQConfig
- ConnectionFactory,類似於資料庫連接等。
@Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(this.mqRabbitHost,this.mqRabbitPort); connectionFactory.setUsername(this.mqRabbitUserName); connectionFactory.setPassword(this.mqRabbitPassword); connectionFactory.setVirtualHost(this.mqRabbitVirtualHost); connectionFactory.setPublisherConfirms(true); return connectionFactory; }
- RabbitTemplate,用來發送消息。
@Bean public RabbitTemplate rabbitTemplate() { RabbitTemplate template = new RabbitTemplate(connectionFactory()); return template; }
- DirectExchange
@Bean public DirectExchange defaultExchange() { return new DirectExchange(EXCHANGE_NAME); }
- Queue,構建隊列,名稱,是否持久化之類
@Bean public Queue queue() { return new Queue(QUEUE_NAME, true); }
- Binding,將DirectExchange與Queue進行綁定
@Bean public Binding binding() { return BindingBuilder.bind(queue()).to(defaultExchange()).with(ROUTING_KEY); }
- SimpleMessageListenerContainer,消費者
需要將ACK修改為手動確認,避免消息在處理過程中發生異常造成被誤認為已經成功消費的假象。
@Bean public SimpleMessageListenerContainer messageContainer() { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory()); container.setQueues(queue()); container.setExposeListenerChannel(true); container.setMaxConcurrentConsumers(1); container.setConcurrentConsumers(1); container.setAcknowledgeMode(AcknowledgeMode.MANUAL); container.setMessageListener(new ChannelAwareMessageListener() { public void onMessage(Message message, com.rabbitmq.client.Channel channel) throws Exception { byte[] body = message.getBody(); logger.info("消費端接收到消息 : " + new String(body)); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } }); return container; }
服務端,業務邏輯,調用消息隊列。
為了讓客戶端知道消息是否已經成功,消息隊列提供了回調機制(需要實現ConfirmCallback),當消息伺服器接收到消息之後會給客戶端一個通知,此時客戶端根據消息應答來決定後續的流程。
@Service public class ProductServiceImpl extends BaseService implements ProductService, RabbitTemplate.ConfirmCallback { @Autowired private ProductMapper productMapper; private RabbitTemplate rabbitTemplate; public ProductServiceImpl(RabbitTemplate rabbitTemplate){ this.rabbitTemplate=rabbitTemplate; this.rabbitTemplate.setConfirmCallback(this); } public void confirm(CorrelationData correlationData, boolean ack, String cause) { this.logger.info(" 消息id:" + correlationData); if (ack) { this.logger.info("消息發送確認成功"); } else { this.logger.info("消息發送確認失敗:" + cause); } } @Override public void save(Product product) { //執行保存 String uuid = UUID.randomUUID().toString(); CorrelationData correlationId = new CorrelationData(uuid); rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, RabbitMQConfig.ROUTING_KEY, product.getName(),correlationId); } }
執行結果
可以清晰的看到RabbitMQ發給生產者的信息收到的確認信息,也能看到消息被消費端消費後的信息。
RabbitMQ的其它方面
高可用方案
與常見的資料庫類似,都是主從模式來保證高可用,可以利用HAProxy來實現主從備份方案。
水平擴展方案
主要是為瞭解決垂直優化的瓶頸問題,主要有這三種:
- clustering,這是預設內置的一種集群模式,與下麵兩種不同的是clustering一般應用於同一區域網。
- federation,有待後續學習
- shovel,有待後續學習
不丟消息特性
這個不是RabbitMQ的專利,將消息持久化可以確保RabbitMQ重啟或者死機過程中不至於丟掉沒有消費的消息。
消息不被重覆消費
這點要靠消費端來完成,儘管消費端可以通過ACK來通知消息隊列消息已經被消費,但如果消費端消費了消息,此時ACK過程中的通知出現異常,消息隊列會認為消息未被消費會繼續發給消費端。
總結
初次安裝可能會出現一堆問題,特別是需要安裝所依賴的眾多包。RabbitMQ與Erlang可能存在版本依賴問題待後續確認。spring boot下集成RabbitMQ異常簡單,可以根據需求部署集群來實現可擴展高可用的消息系統。
引用
- http://www.tuicool.com/articles/AvUnE3J
- http://blog.csdn.net/liaokailin/article/details/49559571
- https://my.oschina.net/fhd/blog/375620
- http://www.jb51.net/os/Ubuntu/45293.html