## Spring安全框架 Spring Security是一個用於保護基於Java的應用程式的框架。它是一個功能強大且高度可定製的身份驗證和訪問控制框架,可以輕鬆地集成到各種應用程式中,包括Web應用程式和RESTful Web服務。Spring Security提供了全面的安全解決方案,用於身份 ...
消費原理概覽
先簡單說下常見的rocketMq的部署方式,上圖中broker為真正計算和存儲消息的地方,而nameServer負責維護broker地
圖中右側consume message部分即是本文重點描述的部分,主要分為ConsumerGroup和Consumer,consumerGroup可以參考https://rocketmq.apache.org/docs/domainModel/07consumergroup/。簡單的說,Consumer即是一個運行的應用,ComsumerGroup即為多個運行的應用組,而其中一個Consumer是如何啟動並接受消息進行消費的呢?
以常見的java應用搭配spring為例,通常來說是在應用啟動並實例化rocketmq sdk的相關類之後,調用相關類的初始化方法,獲取nameServer地址和broker地址,啟動netty客戶端,並通過netty客戶端向broker拉取消息,然後提交到消息消費服務中進行批量消費。
源碼解析
引言
https://rocketmq.apache.org/docs/quickStart/01quickstart
上面是一個普通消息消費的demo,可以看到啟動mq的消費代碼主要分為設置cid、topic、tag和messageLister。
核心啟動方法類 DefaultMQPushConsumerImpl#start
org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#start
這裡會現有個switch方法判斷當前mq客戶端狀態然後執行不同的啟動邏輯。主要是未啟動的話,執行啟動客戶端的邏輯;若已啟動或啟動失敗的話,則拋出異常。然後是更新topic信息、發送心跳、負載均衡等。展開switch代碼塊可以看到如下代碼,同時代碼里serviceState預設為CREATE_JUST,所以在初次啟動時會進入CREATE_JUST的流程。
public synchronized void start() throws MQClientException { switch (this.serviceState) { // 初始化流程 case CREATE_JUST: log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(), this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode()); this.serviceState = ServiceState.START_FAILED; // 校驗設置consumerGroup的配置 this.checkConfig(); // 解析訂閱的topic和tag等數據,構建訂閱關係模型放到rebalanceImpl里 this.copySubscription(); if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) { this.defaultMQPushConsumer.changeInstanceNameToPID(); } // 初始化MQClientInstance、rebalaceImpl等 this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook); this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup()); this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel()); this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy()); this.rebalanceImpl.setmQClientFactory(this.mQClientFactory); if (this.pullAPIWrapper == null) { this.pullAPIWrapper = new PullAPIWrapper( mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode()); } this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList); // 初始化消費進度 // 集群模式的話,消費進度保存在服務端broker;廣播模式,保存在客戶端consumer上 if (this.defaultMQPushConsumer.getOffsetStore() != null) { this.offsetStore = this.defaultMQPushConsumer.getOffsetStore(); } else { switch (this.defaultMQPushConsumer.getMessageModel()) { case BROADCASTING: this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup()); break; case CLUSTERING: this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup()); break; default: break; } this.defaultMQPushConsumer.setOffsetStore(this.offsetStore); } this.offsetStore.load(); // 根據是否順序or併發消費,來創建不同的消費線程服務 // 本次主要關註併發消費, if (this.getMessageListenerInner() instanceof MessageListenerOrderly) { this.consumeOrderly = true; this.consumeMessageService = new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner()); //POPTODO reuse Executor ? this.consumeMessagePopService = new ConsumeMessagePopOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner()); } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) { this.consumeOrderly = false; this.consumeMessageService = new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner()); //POPTODO reuse Executor ? this.consumeMessagePopService = new ConsumeMessagePopConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner()); } this.consumeMessageService.start(); // POPTODO this.consumeMessagePopService.start(); // 向mQClientFactory註冊當前DefaultMQPushConsumerImpl // mQClientFactory里維護了一個consumerGroup和DefaultMQPushConsumerImpl的映射表 boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this); if (!registerOK) { this.serviceState = ServiceState.CREATE_JUST; this.consumeMessageService.shutdown(defaultMQPushConsumer.getAwaitTerminationMillisWhenShutdown()); throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup() + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), null); } // 核心啟動方法 // 這塊是重點的啟動流程,稍後展開說明 mQClientFactory.start(); log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup()); this.serviceState = ServiceState.RUNNING; break; case RUNNING: case START_FAILED: case SHUTDOWN_ALREADY: throw new MQClientException("The PushConsumer service state not OK, maybe started once, " + this.serviceState + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null); default: break; } this.updateTopicSubscribeInfoWhenSubscriptionChanged(); this.mQClientFactory.checkClientInBroker(); this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); this.mQClientFactory.rebalanceImmediately(); }
MQClientInstance#start
可以看到org.apache.rocketmq.client.impl.factory.MQClientInstance#start方法就是啟動consumer的主要流程了
主要分瞭如下幾步,下麵會將下麵的幾步逐個展開說明:
- 拿到nameServer地址list
- 開啟客戶端和mq服務端的連接通道
- 啟動定時任務(定時調用拿nameServer地址list)
- 啟動拉取消息的服務
- 啟動負載均衡服務
- 啟動消息producer的推送服務
public void start() throws MQClientException { synchronized (this) { switch (this.serviceState) { case CREATE_JUST: this.serviceState = ServiceState.START_FAILED; // If not specified,looking address from name server // 這裡拿到nameServer地址list if (null == this.clientConfig.getNamesrvAddr()) { this.mQClientAPIImpl.fetchNameServerAddr(); } // Start request-response channel // 開啟客戶端和mq服務端的連接通道 this.mQClientAPIImpl.start(); // Start various schedule tasks // 啟動定時任務 // 主要是定時2分鐘調用上面fetchNameServerAddr方法,拉取最新的nameServerAddr this.startScheduledTask(); // Start pull service // 啟動拉取消息的服務 this.pullMessageService.start(); // Start rebalance service // 啟動負載均衡服務 this.rebalanceService.start(); // Start push service // 啟動消息producer的推送服務,這塊不展開了 this.defaultMQProducer.getDefaultMQProducerImpl().start(false); log.info("the client factory [{}] start OK", this.clientId); // 以上操作都成功的話,將服務置為running this.serviceState = ServiceState.RUNNING; break; case START_FAILED: throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null); default: break; } } }
1. 獲取nameServerAddr
首先進入到org.apache.rocketmq.client.impl.MQClientAPIImpl#fetchNameServerAddr可以看下麵方法,可以看到主要就是調用topAddressing.fetchNSAddr方法
public String fetchNameServerAddr() { try { // 主要是執行該方法 String addrs = this.topAddressing.fetchNSAddr(); if (!UtilAll.isBlank(addrs)) { if (!addrs.equals(this.nameSrvAddr)) { log.info("name server address changed, old=" + this.nameSrvAddr + ", new=" + addrs); this.updateNameServerAddressList(addrs); this.nameSrvAddr = addrs; return nameSrvAddr; } } } catch (Exception e) { log.error("fetchNameServerAddr Exception", e); } return nameSrvAddr; }
然後點到topAddressing.fetchNSAddr里,這塊提供的spi拓展獲取nameServerAddr,否則的話,走預設方法fetchNSAddr()獲取nameServerAddr。
public final String fetchNSAddr() { // 看了下這塊應該是提供了spi拓展 // 使用方可以通過spi的方式顯示實現TopAddressing.fetchNSAddr方法獲取nameServerAddr if (!topAddressingList.isEmpty()) { for (TopAddressing topAddressing : topAddressingList) { String nsAddress = topAddressing.fetchNSAddr(); if (!Strings.isNullOrEmpty(nsAddress)) { return nsAddress; } } } // Return result of default implementation // 沒有的話,走預設實現 return fetchNSAddr(true, 3000); }
下麵是預設的獲取nameServerAddr的方法,主要是通過拿到wsAddr作為url,拼接para參數,發http請求獲取nameServerAddr。
public final String fetchNSAddr(boolean verbose, long timeoutMills) { // 拿到wsAddr作為url,拼接para參數 String url = this.wsAddr; try { if (null != para && para.size() > 0) { if (!UtilAll.isBlank(this.unitName)) { url = url + "-" + this.unitName + "?nofix=1&"; } else { url = url + "?"; } for (Map.Entry<String, String> entry : this.para.entrySet()) { url += entry.getKey() + "=" + entry.getValue() + "&"; } url = url.substring(0, url.length() - 1); } else { if (!UtilAll.isBlank(this.unitName)) { url = url + "-" + this.unitName + "?nofix=1"; } } // 發送http請求,拿到地址 HttpTinyClient.HttpResult result = HttpTinyClient.httpGet(url, null, null, "UTF-8", timeoutMills); if (200 == result.code) { String responseStr = result.content; if (responseStr != null) { return clearNewLine(responseStr); } else { LOGGER.error("fetch nameserver address is null"); } } else { LOGGER.error("fetch nameserver address failed. statusCode=" + result.code); } } catch (IOException e) { if (verbose) { LOGGER.error("fetch name server address exception", e); } } if (verbose) { String errorMsg = "connect to " + url + " failed, maybe the domain name " + MixAll.getWSAddr() + " not bind in /etc/hosts"; errorMsg += FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL); LOGGER.warn(errorMsg); } return null; } // 預設的wsAddr獲取方式,預設值為 http://jmenv.tbsite.net:8080/rocketmq/nsaddr public static String getWSAddr() { String wsDomainName = System.getProperty("rocketmq.namesrv.domain", DEFAULT_NAMESRV_ADDR_LOOKUP); String wsDomainSubgroup = System.getProperty("rocketmq.namesrv.domain.subgroup", "nsaddr"); String wsAddr = "http://" + wsDomainName + ":8080/rocketmq/" + wsDomainSubgroup; if (wsDomainName.indexOf(":") > 0) { wsAddr = "http://" + wsDomainName + "/rocketmq/" + wsDomainSubgroup; } return wsAddr; }
在本地跑起來環境的話,可以看到返回值是一個ip:port的列表
2. 開啟客戶端和mq服務端的連接通道
org.apache.rocketmq.remoting.netty.NettyRemotingClient#start
這塊主要分為三個部分:
- 啟動netty客戶端
- 延遲3s掃描連接響應表
- 掃描availableNamesrvAddrMap
public void start() { // 這裡標準的編寫netty客戶端代碼 // 主要是添加了NettyClientHandler // 但是此時bootstarp還沒有綁定遠程伺服器host和埠 if (this.defaultEventExecutorGroup == null) { this.defaultEventExecutorGroup = new DefaultEventExecutorGroup( nettyClientConfig.getClientWorkerThreads(), new ThreadFactoryImpl("NettyClientWorkerThread_")); } Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.SO_KEEPALIVE, false) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis()) .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); if (nettyClientConfig.isUseTLS()) { if (null != sslContext) { pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc())); LOGGER.info("Prepend SSL handler"); } else { LOGGER.warn("Connections are insecure as SSLContext is null!"); } } ch.pipeline().addLast( nettyClientConfig.isDisableNettyWorkerGroup() ? null : defaultEventExecutorGroup, new NettyEncoder(), new NettyDecoder(), new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()), new NettyConnectManageHandler(), new NettyClientHandler()); } }); if (nettyClientConfig.getClientSocketSndBufSize() > 0) { LOGGER.info("client set SO_SNDBUF to {}", nettyClientConfig.getClientSocketSndBufSize()); handler.option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize()); } if (nettyClientConfig.getClientSocketRcvBufSize() > 0) { LOGGER.info("client set SO_RCVBUF to {}", nettyClientConfig.getClientSocketRcvBufSize()); handler.option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize()); } if (nettyClientConfig.getWriteBufferLowWaterMark() > 0 && nettyClientConfig.getWriteBufferHighWaterMark() > 0) { LOGGER.info("client set netty WRITE_BUFFER_WATER_MARK to {},{}", nettyClientConfig.getWriteBufferLowWaterMark(), nettyClientConfig.getWriteBufferHighWaterMark()); handler.option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark( nettyClientConfig.getWriteBufferLowWaterMark(), nettyClientConfig.getWriteBufferHighWaterMark())); } if (nettyClientConfig.isClientPooledByteBufAllocatorEnable()) { handler.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); } TimerTask timerTaskScanResponseTable = new TimerTask() { @Override public void run(Timeout timeout) { try { NettyRemotingClient.this.scanResponseTable(); } catch (Throwable e) { LOGGER.error("scanResponseTable exception", e); } finally { timer.newTimeout(this, 1000, TimeUnit.MILLISECONDS); } } }; this.timer.newTimeout(timerTaskScanResponseTable, 1000 * 3, TimeUnit.MILLISECONDS); int connectTimeoutMillis = this.nettyClientConfig.getConnectTimeoutMillis(); TimerTask timerTaskScanAvailableNameSrv = new TimerTask() { @Override public void run(Timeout timeout) { try { NettyRemotingClient.this.scanAvailableNameSrv(); } catch (Exception e) { LOGGER.error("scanAvailableNameSrv exception", e); } finally { timer.newTimeout(this, connectTimeoutMillis, TimeUnit.MILLISECONDS); } } }; this.timer.newTimeout(timerTaskScanAvailableNameSrv, 0, TimeUnit.MILLISECONDS); }
3. 啟動定時任務
org.apache.rocketmq.client.impl.factory.MQClientInstance#startScheduledTask
主要做啟動一些客戶端的定時操作,這裡不展開說明
private void startScheduledTask() { // 還是調用上面的獲取nameServer的方法,定時取nameServer地址列表 if (null == this.clientConfig.getNamesrvAddr()) { this.scheduledExecutorService.scheduleAtFixedRate(() -> { try { MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr(); } catch (Exception e) { log.error("ScheduledTask fetchNameServerAddr exception", e); } }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS); } // 定時更新topic路由信息 this.scheduledExecutorService.scheduleAtFixedRate(() -> { try { MQClientInstance.this.updateTopicRouteInfoFromNameServer(); } catch (Exception e) { log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e); } }, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS); // 定時發送信息 this.scheduledExecutorService.scheduleAtFixedRate(() -> { try { MQClientInstance.this.cleanOfflineBroker(); MQClientInstance.this.sendHeartbeatToAllBrokerWithLock(); } catch (Exception e) { log.error("ScheduledTask sendHeartbeatToAllBroker exception", e); } }, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS); // 定時持久化消費位點 this.scheduledExecutorService.scheduleAtFixedRate(() -> { try { MQClientInstance.this.persistAllConsumerOffset(); } catch (Exception e) { log.error("ScheduledTask persistAllConsumerOffset exception", e); } }, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS); // 定時調整線程池 this.scheduledExecutorService.scheduleAtFixedRate(() -> { try { MQClientInstance.this.adjustThreadPool(); } catch (Exception e) { log.error("ScheduledTask adjustThreadPool exception", e); } }, 1, 1, TimeUnit.MINUTES); }
4. 啟動拉取消息的服務
這一部分主要是mq客戶端向broker拉取消息,然後解碼、過濾後分批交給消費服務進行消費
this.pullMessageService.start()會走到如下方法,org.apache.rocketmq.common.ServiceThread#start
public void start() { log.info("Try to start service thread:{} started:{} lastThread:{}", getServiceName(), started.get(), thread); if (!started.compareAndSet(false, true)) { return; } // 設置stop標記我false,同時新起一個線程執行當前類的run方法 stopped = false; this.thread = new Thread(this, getServiceName()); this.thread.setDaemon(isDaemon); this.thread.start(); log.info("Start service thread:{} started:{} lastThread:{}", getServiceName(), started.get(), thread); }
pullMessageService實現了Runnable介面,會單獨起一個線程並調用如下run方法
org.apache.rocketmq.client.impl.consumer.PullMessageService#run
@Override public void run() { logger.info(this.getServiceName() + " service started"); // 上文設置了stopped為false,會到while里 while (!this.isStopped()) { try { // 從messageRequestQueue里取一個MessageRequest執行。若隊列為空則堵塞 MessageRequest messageRequest = this.messageRequestQueue.take(); if (messageRequest.getMessageRequestMode() == MessageRequestMode.POP) { this.popMessage((PopRequest) messageRequest); } else { this.pullMessage((PullRequest) messageRequest); } } catch (InterruptedException ignored) { } catch (Exception e) { logger.error("Pull Message Service Run Method exception", e); } } logger.info(this.getServiceName() + " service end"); }
以pullMessage方式為例,進入如下方法
org.apache.rocketmq.client.impl.consumer.PullMessageService#pullMessage
private void pullMessage(final PullRequest pullRequest) { // 根據PullRequest的消費組名從map中獲取一個MQConsumerInner final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup()); if (consumer != null) { // consumer不為空執行拉消息方法 DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer; impl.pullMessage(pullRequest); } else { logger.warn("No matched consumer for the PullRequest {}, drop it", pullRequest); } }
下麵展開說明DefaultMQPushConsumerImpl#pullMessage方法
org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage
public void pullMessage(final PullRequest pullRequest) { // 獲取處理隊列 final ProcessQueue processQueue = pullRequest.getProcessQueue(); // droped為true直接返回 if (processQueue.isDropped()) { log.info("the pull request[{}] is dropped.", pullRequest.toString()); return; } // 設置最近的拉取時間戳 pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis()); // 如果狀態不是running的話,則將pullRequest延遲3秒放進messageRequestQueue,並返回 // executePullRequestLater方法操作即是延遲pullTimeDelayMillsWhenException時間將pullRequest放進messageRequestQueue尾部 // executePullRequestLater方法通過上述操作操作,來打到隨後執行pullRequest目的 try { this.makeSureStateOK(); } catch (MQClientException e) { log.warn("pullMessage exception, consumer state not ok", e); this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); return; } // pause為true的話,延遲1s將pullRequest放進messageRequestQueue,並返回 if (this.isPause()) { log.warn("consumer was paused, execute pull request later. instanceName={}, group={}", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup()); this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND); return; } // 下麵這部分為消息拉取的流控代碼 long cachedMessageCount = processQueue.getMsgCount().get(); long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024); // 當前處理隊列消息超過1000時,延遲50ms執行該pullRequest;該操作超過1000次,列印日誌 if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) { this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL); if ((queueFlowControlTimes++ % 1000) == 0) { log.warn( "the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}", this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes); } return; } // 當前處理隊列消息占用空間超過100mb時,延遲50ms後執行該pullRequest if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) { this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL); if ((queueFlowControlTimes++ % 1000) == 0) { log.warn( "the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}", this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes); } return; } // 非順序消息的話 if (!this.consumeOrderly) { // 如果處理隊列里的的最大消息和最小消息的queueSize差值大於2000的話,流控延後處理 if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) { this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL); if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) { log.warn( "the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}", processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(), pullRequest, queueMaxSpanFlowControlTimes); } return; } // 下麵為順序消息,本次先不分析 } else { if (processQueue.isLocked()) { if (!pullRequest.isPreviouslyLocked()) { long offset = -1L; try { offset = this.rebalanceImpl.computePullFromWhereWithException(pullRequest.getMessageQueue()); if (offset < 0) { throw new MQClientException(ResponseCode.SYSTEM_ERROR, "Unexpected offset " + offset); } } catch (Exception e) { this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); log.error("Failed to compute pull offset, pullResult: {}", pullRequest, e); return; } boolean brokerBusy = offset < pullRequest.getNextOffset(); log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}", pullRequest, offset, brokerBusy); if (brokerBusy) { log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}", pullRequest, offset); } pullRequest.setPreviouslyLocked(true); pullRequest.setNextOffset(offset); } } else { this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); log.info("pull message later because not locked in broker, {}", pullRequest); return; } } // 拿到該消息topic的訂閱信息,topic tag 表達式等數據 final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic()); if (null == subscriptionData) { this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); log.warn("find the consumer's subscription failed, {}", pullRequest); return; } final long beginTimestamp = System.currentTimeMillis(); // 下麵是拉取消息的回調函數,等下展開說明 PullCallback pullCallback = new PullCallback() { @Override public void onSuccess(PullResult pullResult) { if (pullResult != null) { pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult, subscriptionData); switch (pullResult.getPullStatus()) { case FOUND: long prevRequestOffset = pullRequest.getNextOffset(); pullRequest.setNextOffset(pullResult.getNextBeginOffset()); long pullRT = System.currentTimeMillis() - beginTimestamp; DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(), pullRequest.getMessageQueue().getTopic(), pullRT); long firstMsgOffset = Long.MAX_VALUE; if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) { DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); } else { firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset(); DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(), pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size()); boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList()); DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest( pullResult.getMsgFoundList(), processQueue, pullRequest.getMessageQueue(), dispatchToConsume); if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) { DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval()); } else { DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); } } if (pullResult.getNextBeginOffset() < prevRequestOffset || firstMsgOffset < prevRequestOffset) { log.warn( "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}", pullResult.getNextBeginOffset(), firstMsgOffset, prevRequestOffset); } break; case NO_NEW_MSG: case NO_MATCHED_MSG: pullRequest.setNextOffset(pullResult.getNextBeginOffset()); DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest); DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); break; case OFFSET_ILLEGAL: log.warn("the pull request offset illegal, {} {}", pullRequest.toString(), pullResult.toString()); pullRequest.setNextOffset(pullResult.getNextBeginOffset()); pullRequest.getProcessQueue().setDropped(true); DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() { @Override public void run() { try { DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(), pullRequest.getNextOffset(), false); DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue()); DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue()); log.warn("fix the pull request offset, {}", pullRequest); } catch (Throwable e) { log.error("executeTaskLater Exception", e); } } }, 10000); break; default: break; } } } @Override public void onException(Throwable e) { if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { log.warn("execute the pull request exception", e); } if (e instanceof MQBrokerException && ((MQBrokerException) e).getResponseCode() == ResponseCode.FLOW_CONTROL) { DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_BROKER_FLOW_CONTROL); } else { DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); } } }; boolean commitOffsetEnable = false; long commitOffsetValue = 0L; if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) { commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY); if (commitOffsetValue > 0) { commitOffsetEnable = true; } } // 拿到訂閱表達式 類過濾標識等 String subExpression = null; boolean classFilter = false; SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic()); if (sd != null) { if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) { subExpression = sd.getSubString(); } classFilter = sd.isClassFilterMode(); } // 構建標記 int sysFlag = PullSysFlag.buildSysFlag( commitOffsetEnable, // commitOffset true, // suspend subExpression != null, // subscription classFilter // class filter ); try { // 執行真正拉取消息的方法,並傳輸回調函數pullCallBack this.pullAPIWrapper.pullKernelImpl( pullRequest.getMessageQueue(), subExpression, subscriptionData.getExpressionType(), subscriptionData.getSubVersion(), pullRequest.getNextOffset(), this.defaultMQPushConsumer.getPullBatchSize(), this.defaultMQPushConsumer.getPullBatchSizeInBytes(), sysFlag, commitOffsetValue, BROKER_SUSPEND_MAX_TIME_MILLIS, CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND, CommunicationMode.ASYNC, pullCallback ); } catch (Exception e) { log.error("pullKernelImpl exception", e); this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); } }
4.1. 真正拉取消息
這一部分主要是通過netty客戶端向broker拉取消息
如下方法
org.apache.rocketmq.client.impl.consumer.PullAPIWrapper#pullKernelImpl(org.apache.rocketmq.common.message.MessageQueue, java.lang.String, java.lang.String, long, long, int, int, int, long, long, long,
org.apache.rocketmq.client.impl.CommunicationMode, org.apache.rocketmq.client.consumer.PullCallback) public PullResult pullKernelImpl( final MessageQueue mq, final String subExpression, final String expressionType, final long subVersion, final long offset, final int maxNums, final int maxSizeInBytes, final int sysFlag, final long commitOffset, final long brokerSuspendMaxTimeMillis, final long timeoutMillis, final CommunicationMode communicationMode, final PullCallback pullCallback ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { // 根據messageQueue拿到broker信息,包括broker地址、是否slave、版本等 FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(this.mQClientFactory.getBrokerNameFromMessageQueue(mq), this.recalculatePullFromWhichNode(mq), false); // 拿不到的話,從nameServer更新topic信息後,重新拿 if (null == findBrokerResult) { this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic()); findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(this.mQClientFactory.getBrokerNameFromMessageQueue(mq), this.recalculatePullFromWhichNode(mq), false); } if (findBrokerResult != null) { { // check version if (!ExpressionType.isTagType(expressionType) && findBrokerResult.getBrokerVersion() < MQVersion.Version.V4_1_0_SNAPSHOT.ordinal()) { throw new MQClientException("The broker[" + mq.getBrokerName() + ", " + findBrokerResult.getBrokerVersion() + "] does not upgrade to support for filter message by " + expressionType, null); } } int sysFlagInner = sysFlag; if (findBrokerResult.isSlave()) { sysFlagInner = PullSysFlag.clearCommitOffsetFlag(sysFlagInner); } // 組裝請求頭 PullMessageRequestHeader requestHeader = new PullMessageRequestHeader(); requestHeader.setConsumerGroup(this.consumerGroup); requestHeader.setTopic(mq.getTopic()); requestHeader.setQueueId(mq.getQueueId()); requestHeader.setQueueOffset(offset); requestHeader.setMaxMsgNums(maxNums); requestHeader.setSysFlag(sysFlagInner); requestHeader.setCommitOffset(commitOffset); requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis); requestHeader.setSubscription(subExpression); requestHeader.setSubVersion(subVersion); requestHeader.setMaxMsgBytes(maxSizeInBytes); requestHeader.setExpressionType(expressionType); requestHeader.setBname(mq.getBrokerName()); // 拿到broker地址 String brokerAddr = findBrokerResult.getBrokerAddr(); if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) { brokerAddr = computePullFromWhichFilterServer(mq.getTopic(), brokerAddr); } // 通過mq客戶端拉取消息 PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage( brokerAddr, requestHeader, timeoutMillis, communicationMode, pullCallback); return pullResult; } throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null); }
org.apache.rocketmq.client.impl.MQClientAPIImpl#pullMessage
public PullResult pullMessage( final String addr, final PullMessageRequestHeader requestHeader, final long timeoutMillis, final CommunicationMode communicationMode, final PullCallback pullCallback ) throws RemotingException, MQBrokerException, InterruptedException { RemotingCommand request; // 這個先不糾結,根據不同的code組裝不同的RemotingCommand if (PullSysFlag.hasLitePullFlag(requestHeader.getSysFlag())) { request = RemotingCommand.createRequestCommand(RequestCode.LITE_PULL_MESSAGE, requestHeader); } else { request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader); } // 上文的communicationMode欄位傳入的是ASYNC,所以這裡走的ASYNC模式 switch (communicationMode) { case ONEWAY: assert false; return null; case ASYNC: this.pullMessageAsync(addr, request, timeoutMillis, pullCallback); return null; case SYNC: return this.pullMessageSync(addr, request, timeoutMillis); default: assert false; break; } return null; }
進入org.apache.rocketmq.client.impl.MQClientAPIImpl#pullMessageAsync
可以看到remotingClient為netty客戶端,通過netty想broker獲取消息,
private void pullMessageAsync( final String addr, final RemotingCommand request, final long timeoutMillis, final PullCallback pullCallback ) throws RemotingException, InterruptedException { // 通過netty客戶端想broker獲取消息,具體不在贅述 // 這裡傳入InvokeCallback,在拉消息完成後執行InvokeCallback this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() { @Override // 拿到消息後 public void operationComplete(ResponseFuture responseFuture) { RemotingCommand response = responseFuture.getResponseCommand(); if (response != null) { try { // 處理response解析相關請求頭和body放到pullResult里 PullResult pullResult = MQClientAPIImpl.this.processPullResponse(response, addr); assert pullResult != null; // 拉取成功的話執行success回調函數,處理pullResult pullCallback.onSuccess(pullResult); } catch (Exception e) { pullCallback.onException(e); } } else { if (!responseFuture.isSendRequestOK()) { pullCallback.onException(new MQClientException(ClientErrorCode.CONNECT_BROKER_EXCEPTION, "send request failed to " + addr + ". Request: " + request, responseFuture.getCause())); } else if (responseFuture.isTimeout()) { pullCallback.onException(new MQClientException(ClientErrorCode.ACCESS_BROKER_TIMEOUT, "wait response from " + addr + " timeout :" + responseFuture.getTimeoutMillis() + "ms" + ". Request: " + request, responseFuture.getCause())); } else { pullCallback.onException(new MQClientException("unknown reason. addr: " + addr + ", timeoutMillis: " + timeoutMillis + ". Request: " + request, responseFuture.getCause())); } } } }); }
4.2. 執行success回調函數
這一部分主要是在拉取消息成功後,執行回調函數解碼、過濾消息列表,然後提交消費服務分批消費
重點看拉取狀態為FOUND 的情況,
PullCallback pullCallback = new PullCallback() { @Override public void onSuccess(PullResult pullResult) { if (pullResult != null) { // 這裡解析pullResult,將pullResult的messageBinary(byte數組)解碼為MessageExt列表 // 再根據subscriptionData里的tag classFilter等過濾MessageExt列表 // 將過濾後的msgListFilterAgain列表塞道pullResult pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult, subscriptionData); switch (pullResult.getPullStatus()) { case FOUND: long prevRequestOffset = pullRequest.getNextOffset(); // 設置下一次偏移量 pullRequest.setNextOffset(pullResult.getNextBeginOffset()); // 計算拉取rt long pullRT = System.currentTimeMillis() - beginTimestamp; DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(), pullRequest.getMessageQueue().getTopic(), pullRT); // 如果本次拉取消息量為空,立即將pullRequest返回隊列里,等後續繼續拉 long firstMsgOffset = Long.MAX_VALUE; if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) { DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); } else { // 拉到消息的情況如下 firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset(); // 計算拉取的TPS DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(), pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size()); // 把消息放到處理隊列里 boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList()); // 向消費服務提交消費請求 DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest( pullResult.getMsgFoundList(), processQueue, pullRequest.getMessageQueue(), dispatchToConsume); if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) { DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval()); } else {