接著上一篇博客 【RocketMQ中Broker的啟動源碼分析(一)】 在完成準備工作後,調用start方法: 這裡最主要的是通過BrokerController 的start方法來完成啟動 BrokerController的start方法: 首先通過messageStore啟動messageSto ...
接著上一篇博客 【RocketMQ中Broker的啟動源碼分析(一)】
在完成準備工作後,調用start方法:
1 public static BrokerController start(BrokerController controller) { 2 try { 3 4 controller.start(); 5 6 String tip = "The broker[" + controller.getBrokerConfig().getBrokerName() + ", " 7 + controller.getBrokerAddr() + "] boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer(); 8 9 if (null != controller.getBrokerConfig().getNamesrvAddr()) { 10 tip += " and name server is " + controller.getBrokerConfig().getNamesrvAddr(); 11 } 12 13 log.info(tip); 14 System.out.printf("%s%n", tip); 15 return controller; 16 } catch (Throwable e) { 17 e.printStackTrace(); 18 System.exit(-1); 19 } 20 21 return null; 22 }
這裡最主要的是通過BrokerController 的start方法來完成啟動
BrokerController的start方法:
1 public void start() throws Exception { 2 if (this.messageStore != null) { 3 this.messageStore.start(); 4 } 5 6 if (this.remotingServer != null) { 7 this.remotingServer.start(); 8 } 9 10 if (this.fastRemotingServer != null) { 11 this.fastRemotingServer.start(); 12 } 13 14 if (this.fileWatchService != null) { 15 this.fileWatchService.start(); 16 } 17 18 if (this.brokerOuterAPI != null) { 19 this.brokerOuterAPI.start(); 20 } 21 22 if (this.pullRequestHoldService != null) { 23 this.pullRequestHoldService.start(); 24 } 25 26 if (this.clientHousekeepingService != null) { 27 this.clientHousekeepingService.start(); 28 } 29 30 if (this.filterServerManager != null) { 31 this.filterServerManager.start(); 32 } 33 34 if (!messageStoreConfig.isEnableDLegerCommitLog()) { 35 startProcessorByHa(messageStoreConfig.getBrokerRole()); 36 handleSlaveSynchronize(messageStoreConfig.getBrokerRole()); 37 } 38 39 40 41 this.registerBrokerAll(true, false, true); 42 43 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { 44 45 @Override 46 public void run() { 47 try { 48 BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister()); 49 } catch (Throwable e) { 50 log.error("registerBrokerAll Exception", e); 51 } 52 } 53 }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS); 54 55 if (this.brokerStatsManager != null) { 56 this.brokerStatsManager.start(); 57 } 58 59 if (this.brokerFastFailure != null) { 60 this.brokerFastFailure.start(); 61 } 62 63 64 }
首先通過messageStore啟動messageStore
DefaultMessageStore的start方法:
1 public void start() throws Exception { 2 lock = lockFile.getChannel().tryLock(0, 1, false); 3 if (lock == null || lock.isShared() || !lock.isValid()) { 4 throw new RuntimeException("Lock failed,MQ already started"); 5 } 6 7 lockFile.getChannel().write(ByteBuffer.wrap("lock".getBytes())); 8 lockFile.getChannel().force(true); 9 { 10 /** 11 * 1. Make sure the fast-forward messages to be truncated during the recovering according to the max physical offset of the commitlog; 12 * 2. DLedger committedPos may be missing, so the maxPhysicalPosInLogicQueue maybe bigger that maxOffset returned by DLedgerCommitLog, just let it go; 13 * 3. Calculate the reput offset according to the consume queue; 14 * 4. Make sure the fall-behind messages to be dispatched before starting the commitlog, especially when the broker role are automatically changed. 15 */ 16 long maxPhysicalPosInLogicQueue = commitLog.getMinOffset(); 17 for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) { 18 for (ConsumeQueue logic : maps.values()) { 19 if (logic.getMaxPhysicOffset() > maxPhysicalPosInLogicQueue) { 20 maxPhysicalPosInLogicQueue = logic.getMaxPhysicOffset(); 21 } 22 } 23 } 24 if (maxPhysicalPosInLogicQueue < 0) { 25 maxPhysicalPosInLogicQueue = 0; 26 } 27 if (maxPhysicalPosInLogicQueue < this.commitLog.getMinOffset()) { 28 maxPhysicalPosInLogicQueue = this.commitLog.getMinOffset(); 29 /** 30 * This happens in following conditions: 31 * 1. If someone removes all the consumequeue files or the disk get damaged. 32 * 2. Launch a new broker, and copy the commitlog from other brokers. 33 * 34 * All the conditions has the same in common that the maxPhysicalPosInLogicQueue should be 0. 35 * If the maxPhysicalPosInLogicQueue is gt 0, there maybe something wrong. 36 */ 37 log.warn("[TooSmallCqOffset] maxPhysicalPosInLogicQueue={} clMinOffset={}", maxPhysicalPosInLogicQueue, this.commitLog.getMinOffset()); 38 } 39 log.info("[SetReputOffset] maxPhysicalPosInLogicQueue={} clMinOffset={} clMaxOffset={} clConfirmedOffset={}", 40 maxPhysicalPosInLogicQueue, this.commitLog.getMinOffset(), this.commitLog.getMaxOffset(), this.commitLog.getConfirmOffset()); 41 this.reputMessageService.setReputFromOffset(maxPhysicalPosInLogicQueue); 42 this.reputMessageService.start(); 43 44 /** 45 * 1. Finish dispatching the messages fall behind, then to start other services. 46 * 2. DLedger committedPos may be missing, so here just require dispatchBehindBytes <= 0 47 */ 48 while (true) { 49 if (dispatchBehindBytes() <= 0) { 50 break; 51 } 52 Thread.sleep(1000); 53 log.info("Try to finish doing reput the messages fall behind during the starting, reputOffset={} maxOffset={} behind={}", this.reputMessageService.getReputFromOffset(), this.getMaxPhyOffset(), this.dispatchBehindBytes()); 54 } 55 this.recoverTopicQueueTable(); 56 } 57 58 if (!messageStoreConfig.isEnableDLegerCommitLog()) { 59 this.haService.start(); 60 this.handleScheduleMessageService(messageStoreConfig.getBrokerRole()); 61 } 62 63 this.flushConsumeQueueService.start(); 64 this.commitLog.start(); 65 this.storeStatsService.start(); 66 67 this.createTempFile(); 68 this.addScheduleTask(); 69 this.shutdown = false; 70 }
這裡首先嘗試獲取.../store/lock文件鎖,保證磁碟上的文件只會被一個messageStore讀寫
然後通過commitLog的getMinOffset方法獲取最小的Offset
commitLog會將消息持久化為文件,每個文件預設最大1G,當超過1G,則會新創建一個文件存儲,如此反覆
而commitLog會把這些文件在物理上不連續的Offset映射成邏輯上連續的Offset,以此來定位
CommitLog的getMinOffset方法:
1 public long getMinOffset() { 2 MappedFile mappedFile = this.mappedFileQueue.getFirstMappedFile(); 3 if (mappedFile != null) { 4 if (mappedFile.isAvailable()) { 5 return mappedFile.getFileFromOffset(); 6 } else { 7 return this.rollNextFile(mappedFile.getFileFromOffset()); 8 } 9 } 10 11 return -1; 12 }
CommitLog管理的這些文件是通過mappedFileQueue管理,mappedFileQueue中會通過mappedFiles映射到每一個文件:
1 private final CopyOnWriteArrayList<MappedFile> mappedFiles = new CopyOnWriteArrayList<MappedFile>();
MappedFileQueue的getFirstMappedFile方法:
1 public MappedFile getFirstMappedFile() { 2 MappedFile mappedFileFirst = null; 3 4 if (!this.mappedFiles.isEmpty()) { 5 try { 6 mappedFileFirst = this.mappedFiles.get(0); 7 } catch (IndexOutOfBoundsException e) { 8 //ignore 9 } catch (Exception e) { 10 log.error("getFirstMappedFile has exception.", e); 11 } 12 } 13 14 return mappedFileFirst; 15 }
這裡很簡單,在mappedFiles不為空的情況下,會取出第一個MappedFile
MappedFile 則持有與文件有關的屬性和操作:
1 public class MappedFile extends ReferenceResource { 2 protected int fileSize; 3 protected FileChannel fileChannel; 4 protected ByteBuffer writeBuffer = null; 5 private String fileName; 6 private long fileFromOffset; 7 private File file; 8 ...... 9 }
MappedFile可以通過fileChannel來完成對文件的訪問和修改
在得到第一個文件的MappedFile映射後,通過getFileFromOffset方法,獲取該文件的Offset
在DefaultMessageStore的start方法中將這個Offset作為maxPhysicalPosInLogicQueue
然後遍歷consumeQueueTable中的所有ConsumeQueue,通過ConsumeQueue可以得到消費的最大Offset
遍歷完成,maxPhysicalPosInLogicQueue就會被替換為最大的那次的消費Offset,這樣後續就可以通過這個Offset映射到具體哪個文件的哪個位置
接著調用reputMessageService的setReputFromOffset方法:
1 public void setReputFromOffset(long reputFromOffset) { 2 this.reputFromOffset = reputFromOffset; 3 }
將reputFromOffset更新為剛纔得到的Offset
然後調用reputMessageService的start方法,啟動ReputMessageService服務,ReputMessageService是一個Thread,所以是啟動了一個線程:
1 public void run() { 2 DefaultMessageStore.log.info(this.getServiceName() + " service started"); 3 4 while (!this.isStopped()) { 5 try { 6 Thread.sleep(1); 7 this.doReput(); 8 } catch (Exception e) { 9 DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e); 10 } 11 } 12 13 DefaultMessageStore.log.info(this.getServiceName() + " service end"); 14 }
這個線程很簡單,定時1毫秒調用doReput方法
ReputMessageService的doReput方法:
1 private void doReput() { 2 if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) { 3 log.warn("The reputFromOffset={} is smaller than minPyOffset={}, this usually indicate that the dispatch behind too much and the commitlog has expired.", 4 this.reputFromOffset, DefaultMessageStore.this.commitLog.getMinOffset()); 5 this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset(); 6 } 7 for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) { 8 9 if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable() 10 && this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) { 11 break; 12 } 13 14 SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset); 15 if (result != null) { 16 try { 17 this.reputFromOffset = result.getStartOffset(); 18 19 for (int readSize = 0; readSize < result.getSize() && doNext; ) { 20 DispatchRequest dispatchRequest = 21 DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false); 22 int size = dispatchRequest.getBufferSize() == -1 ? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize(); 23 24 if (dispatchRequest.isSuccess()) { 25 if (size > 0) { 26 DefaultMessageStore.this.doDispatch(dispatchRequest); 27 28 if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() 29 && DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) { 30 DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(), 31 dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1, 32 dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(), 33 dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap()); 34 } 35 36 this.reputFromOffset += size; 37 readSize += size; 38 if (DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) { 39 DefaultMessageStore.this.storeStatsService 40 .getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).incrementAndGet(); 41 DefaultMessageStore.this.storeStatsService 42 .getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic()) 43 .addAndGet(dispatchRequest.getMsgSize()); 44 } 45 } else if (size == 0) { 46 this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset); 47 readSize = result.getSize(); 48 } 49 } else if (!dispatchRequest.isSuccess()) { 50 51 if (size > 0) { 52 log.error("[BUG]read total count not equals msg total size. reputFromOffset={}", reputFromOffset); 53 this.reputFromOffset += size; 54 } else { 55 doNext = false; 56 log.error("[BUG]dispatch message to consume queue error, COMMITLOG OFFSET: {}", 57 this.reputFromOffset); 58 59 this.reputFromOffset += result.getSize() - readSize; 60 } 61 } 62 } 63 } finally { 64 result.release(); 65 } 66 } else { 67 doNext = false; 68 } 69 } 70 }
首先看到這個for迴圈的結束條件isCommitLogAvailable
isCommitLogAvailable方法:
1 private boolean isCommitLogAvailable() { 2 return this.reputFromOffset < DefaultMessageStore.this.commitLog.getMaxOffset(); 3 }
其中commitLog的getMaxOffset方法和getMinOffset方法相似:
1 public long getMaxOffset() { 2 MappedFile mappedFile = getLastMappedFile(); 3 if (mappedFile != null) { 4 return mappedFile.getFileFromOffset() + mappedFile.getReadPosition(); 5 } 6 return 0; 7 } 8 9 public MappedFile getLastMappedFile() { 10 MappedFile mappedFileLast = null; 11 12 while (!this.mappedFiles.isEmpty()) { 13 try { 14 mappedFileLast = this.mappedFiles.get(this.mappedFiles.size() - 1); 15 break; 16 } catch (IndexOutOfBoundsException e) { 17 //continue; 18 } catch (Exception e) { 19 log.error("getLastMappedFile has exception.", e); 20 break; 21 } 22 } 23 24 return mappedFileLast; 25 }
先通過getLastMappedFile得到最後一個文件的映射MappedFile
進而得到fileFromOffset,通過fileFromOffset+ReadPosition定位到當前文件讀取指針的位置
isCommitLogAvailable方法,就是判斷reputFromOffset是否達到了最後一個文件能訪問的地方
回到for迴圈,根據reputFromOffset,通過commitLog的getData方法獲取SelectMappedBufferResult
CommitLog的getData方法:
1 public SelectMappedBufferResult getData(final long offset) { 2 return this.getData(offset, offset == 0); 3 } 4 5 public SelectMappedBufferResult getData(final long offset, final boolean returnFirstOnNotFound) { 6 int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog(); 7 MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, returnFirstOnNotFound); 8 if (mappedFile != null) { 9 int pos = (int) (offset % mappedFileSize); 10 SelectMappedBufferResult result = mappedFile.selectMappedBuffer(pos); 11 return result; 12 } 13 14 return null; 15 }
這裡的mappedFileSize就是文件的大小,預設1G
根據reputFromOffset通過mappedFileQueue的findMappedFileByOffset方法定位具體的MappedFile文件映射
MappedFileQueue的findMappedFileByOffset方法:
1 public MappedFile findMappedFileByOffset(final long offset, final boolean returnFirstOnNotFound) { 2 try { 3 MappedFile firstMappedFile = this.getFirstMappedFile(); 4 MappedFile lastMappedFile = this.getLastMappedFile(); 5 if (firstMappedFile != null && lastMappedFile != null) { 6 if (offset < firstMappedFile.getFileFromOffset() || offset >= lastMappedFile.getFileFromOffset() + this.mappedFileSize) { 7 LOG_ERROR.warn("Offset not matched. Request offset: {}, firstOffset: {}, lastOffset: {}, mappedFileSize: {}, mappedFiles count: {}", 8 offset, 9 firstMappedFile.getFileFromOffset(), 10 lastMappedFile.getFileFromOffset() + this.mappedFileSize, 11 this.mappedFileSize, 12 this.mappedFiles.size()); 13 } else { 14 int index = (int) ((offset / this.mappedFileSize) - (firstMappedFile.getFileFromOffset() / this.mappedFileSize)); 15 MappedFile targetFile = null; 16 try { 17 targetFile = this.mappedFiles.get(index); 18 } catch (Exception ignored) { 19 } 20 21 if (targetFile != null && offset >= targetFile.getFileFromOffset() 22 && offset < targetFile.getFileFromOffset() + this.mappedFileSize) { 23 return targetFile; 24 } 25 26 for (MappedFile tmpMappedFile : this.mappedFiles) { 27 if (offset >= tmpMappedFile.getFileFromOffset() 28 && offset < tmpMappedFile.getFileFromOffset() + this.mappedFileSize) { 29 return tmpMappedFile; 30 } 31 } 32 } 33 34 if (returnFirstOnNotFound) { 35 return firstMappedFile; 36 } 37 } 38 } catch (Exception e) { 39 log.error("findMappedFileByOffset Exception", e); 40 } 41 42 return null; 43 }
首先檢查offset的有效性,然後通過:
1 int index = (int) ((offset / this.mappedFileSize) - (firstMappedFile.getFileFromOffset() / this.mappedFileSize));
這個簡單的計算,得到offset對應的文件在mappedFiles這個list中的下標,進而得到文件映射MappedFile
回到getData,再通過offset得到MappedFile後
通過offset和mappedFileSize(1G)取餘,得到文件指針起始位置
然後調用mappedFile的selectMappedBuffer方法,得到SelectMappedBufferResult:
1 public SelectMappedBufferResult selectMappedBuffer(int pos) { 2 int readPosition = getReadPosition(); 3 if (pos < readPosition && pos >= 0) { 4 if (this.hold()) { 5 ByteBuffer byteBuffer = this.mappedByteBuffer.slice(); 6 byteBuffer.position(pos); 7 int size = readPosition - pos; 8 ByteBuffer byteBufferNew = byteBuffer.slice(); 9 byteBufferNew.limit(size); 10 return new SelectMappedBufferResult(this.fileFromOffset + pos, byteBufferNew, size, this); 11 } 12 } 13 14 return null; 15 }
這裡通過JDK的NIO操作,將文件從pos起始到readPosition結束的數據(所有的消息信息)放入byteBufferNew中
然後將這些信息封裝在SelectMappedBufferResult中
回到doReput方法,在得到SelectMappedBufferResult後,首先會跟新當前reputFromOffset
進入for迴圈,會將封裝好的消息從頭讀取完,通過commitLog的checkMessageAndReturnSize方法封裝成一個個的DispatchRequest
CommitLog的checkMessageAndReturnSize方法:
1 public DispatchRequest checkMessageAndReturnSize(java.nio.ByteBuffer byteBuffer, final boolean checkCRC, 2 final boolean readBody) { 3 try { 4 // 1 TOTAL SIZE 5 int totalSize = byteBuffer.getInt(); 6 7 // 2 MAGIC CODE 8 int magicCode = byteBuffer.getInt(); 9 switch (magicCode) { 10 case MESSAGE_MAGIC_CODE: 11 break; 12 case BLANK_MAGIC_CODE: 13 return new DispatchRequest(0, true /* success */); 14 default: 15 log.warn("found a illegal magic code 0x" + Integer.toHexString(magicCode)); 16 return new DispatchRequest(-1, false /* success */); 17 } 18 19 byte[] bytesContent = new byte[totalSize]; 20 21 int bodyCRC = byteBuffer.getInt(); 22 23 int queueId = byteBuffer.getInt(); 24 25 int flag = byteBuffer.getInt(); 26 27 long queueOffset = byteBuffer.getLong(); 28 29 long physicOffset = byteBuffer.getLong(); 30 31 int sysFlag = byteBuffer.getInt(); 32 33 long bornTimeStamp = byteBuffer.getLong(); 34 35 ByteBuffer byteBuffer1 = byteBuffer.get(bytesContent, 0, 8); 36 37 long storeTimestamp = byteBuffer.getLong(); 38 39 ByteBuffer byteBuffer2 = byteBuffer.get(bytesContent, 0, 8); 40 41 int reconsumeTimes = byteBuffer.getInt(); 42 43 long preparedTransactionOffset = byteBuffer.getLong(); 44 45