上一篇博客的最後簡單提了下CommitLog的刷盤 【RocketMQ中Broker的消息存儲源碼分析】 (這篇博客和上一篇有很大的聯繫) Broker的CommitLog刷盤會啟動一個線程,不停地將緩衝區的內容寫入磁碟(CommitLog文件)中,主要分為非同步刷盤和同步刷盤 非同步刷盤又可以分為兩種 ...
上一篇博客的最後簡單提了下CommitLog的刷盤 【RocketMQ中Broker的消息存儲源碼分析】 (這篇博客和上一篇有很大的聯繫)
Broker的CommitLog刷盤會啟動一個線程,不停地將緩衝區的內容寫入磁碟(CommitLog文件)中,主要分為非同步刷盤和同步刷盤
非同步刷盤又可以分為兩種方式:
①緩存到mappedByteBuffer -> 寫入磁碟(包括同步刷盤)
②緩存到writeBuffer -> 緩存到fileChannel -> 寫入磁碟 (前面說過的開啟記憶體位元組緩衝區情況下)
CommitLog的兩種刷盤模式:
1 public enum FlushDiskType { 2 SYNC_FLUSH, 3 ASYNC_FLUSH 4 }
同步和非同步,同步刷盤由GroupCommitService實現,非同步刷盤由FlushRealTimeService實現,預設採用非同步刷盤
在採用非同步刷盤的模式下,若是開啟記憶體位元組緩衝區,那麼會在FlushRealTimeService的基礎上開啟CommitRealTimeService
同步刷盤:
啟動GroupCommitService線程:
1 public void run() { 2 CommitLog.log.info(this.getServiceName() + " service started"); 3 4 while (!this.isStopped()) { 5 try { 6 this.waitForRunning(10); 7 this.doCommit(); 8 } catch (Exception e) { 9 CommitLog.log.warn(this.getServiceName() + " service has exception. ", e); 10 } 11 } 12 13 // Under normal circumstances shutdown, wait for the arrival of the 14 // request, and then flush 15 try { 16 Thread.sleep(10); 17 } catch (InterruptedException e) { 18 CommitLog.log.warn("GroupCommitService Exception, ", e); 19 } 20 21 synchronized (this) { 22 this.swapRequests(); 23 } 24 25 this.doCommit(); 26 27 CommitLog.log.info(this.getServiceName() + " service end"); 28 }
通過迴圈中的doCommit不斷地進行刷盤
doCommit方法:
1 private void doCommit() { 2 synchronized (this.requestsRead) { 3 if (!this.requestsRead.isEmpty()) { 4 for (GroupCommitRequest req : this.requestsRead) { 5 // There may be a message in the next file, so a maximum of 6 // two times the flush 7 boolean flushOK = false; 8 for (int i = 0; i < 2 && !flushOK; i++) { 9 flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset(); 10 11 if (!flushOK) { 12 CommitLog.this.mappedFileQueue.flush(0); 13 } 14 } 15 16 req.wakeupCustomer(flushOK); 17 } 18 19 long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp(); 20 if (storeTimestamp > 0) { 21 CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp); 22 } 23 24 this.requestsRead.clear(); 25 } else { 26 // Because of individual messages is set to not sync flush, it 27 // will come to this process 28 CommitLog.this.mappedFileQueue.flush(0); 29 } 30 } 31 }
其中在GroupCommitService中管理著兩張List:
1 private volatile List<GroupCommitRequest> requestsWrite = new ArrayList<GroupCommitRequest>(); 2 private volatile List<GroupCommitRequest> requestsRead = new ArrayList<GroupCommitRequest>();
GroupCommitRequest中封裝了一個Offset
1 private final long nextOffset;
這裡就需要看到上一篇博客結尾提到的handleDiskFlush方法:
1 public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) { 2 // Synchronization flush 3 if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) { 4 final GroupCommitService service = (GroupCommitService) this.flushCommitLogService; 5 if (messageExt.isWaitStoreMsgOK()) { 6 GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes()); 7 service.putRequest(request); 8 boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout()); 9 if (!flushOK) { 10 log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags() 11 + " client address: " + messageExt.getBornHostString()); 12 putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT); 13 } 14 } else { 15 service.wakeup(); 16 } 17 } 18 // Asynchronous flush 19 else { 20 if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) { 21 flushCommitLogService.wakeup(); 22 } else { 23 commitLogService.wakeup(); 24 } 25 } 26 }
這個方法的調用發生在Broker接收到來自Producer的消息,並且完成了向ByteBuffer的寫入
可以看到,在同步刷盤SYNC_FLUSH模式下,會從AppendMessageResult 中取出WroteOffset以及WroteBytes從而計算出nextOffset,把這個nextOffset封裝到GroupCommitRequest中,然後通過GroupCommitService 的putRequest方法,將GroupCommitRequest添加到requestsWrite這個List中
putRequest方法:
1 public synchronized void putRequest(final GroupCommitRequest request) { 2 synchronized (this.requestsWrite) { 3 this.requestsWrite.add(request); 4 } 5 if (hasNotified.compareAndSet(false, true)) { 6 waitPoint.countDown(); // notify 7 } 8 }
在完成List的add操作後,會通過CAS操作修改hasNotified這個原子化的Boolean值,同時通過waitPoint的countDown進行喚醒操作,在後面會有用
由於這裡這裡是同步刷盤,所以需要通過GroupCommitRequest的waitForFlush方法,在超時時間內等待該記錄對應的刷盤完成
而非同步刷盤會通過wakeup方法喚醒刷盤任務,並沒有進行等待,這就是二者區別
回到doCommit方法中,這時會發現這裡是對requestsRead這條List進行的操作,而剛纔是將記錄存放在requestsWrite這條List中的
這就和在run方法中的waitForRunning方法有關了:
1 protected void waitForRunning(long interval) { 2 if (hasNotified.compareAndSet(true, false)) { 3 this.onWaitEnd(); 4 return; 5 } 6 7 //entry to wait 8 waitPoint.reset(); 9 10 try { 11 waitPoint.await(interval, TimeUnit.MILLISECONDS); 12 } catch (InterruptedException e) { 13 log.error("Interrupted", e); 14 } finally { 15 hasNotified.set(false); 16 this.onWaitEnd(); 17 } 18 }
這裡通過CAS操作修改hasNotified值,從而調用onWaitEnd方法;如果修改失敗,則因為await進入阻塞,等待上面所說的putRequest方法將其喚醒,也就是說當Producer發送的消息被緩存成功後,調用handleDiskFlush方法後,喚醒刷盤線工作,當然刷盤線程在達到超時時間interval後也會喚醒
再來看看onWaitEnd方法:
1 protected void onWaitEnd() { 2 this.swapRequests(); 3 } 4 5 private void swapRequests() { 6 List<GroupCommitRequest> tmp = this.requestsWrite; 7 this.requestsWrite = this.requestsRead; 8 this.requestsRead = tmp; 9 }
可以看到,這裡是將兩個List進行了交換
這是一個非常有趣的做法,如果熟悉JVM的話,有沒有覺得這其實很像新生代的複製演算法!
當刷盤線程阻塞的時候,requestsWrite中會填充記錄,當刷盤線程被喚醒工作的時候,首先會將requestsWrite和requestsRead進行交換,那麼此時的記錄就是從requestsRead中讀取的了,而同時requestsWrite會變為空的List,消息記錄就會往這個空的List中填充,如此往複
可以看到doCommit方法中,當requestsRead不為空的時候,在最後會調用requestsRead的clear方法,由此證明瞭我上面的說法
仔細來看看是如何進行刷盤的:
1 for (GroupCommitRequest req : this.requestsRead) { 2 // There may be a message in the next file, so a maximum of 3 // two times the flush 4 boolean flushOK = false; 5 for (int i = 0; i < 2 && !flushOK; i++) { 6 flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset(); 7 8 if (!flushOK) { 9 CommitLog.this.mappedFileQueue.flush(0); 10 } 11 } 12 13 req.wakeupCustomer(flushOK); 14 }
通過遍歷requestsRead,可以到得到GroupCommitRequest封裝的NextOffset
其中flushedWhere是用來記錄上一次刷盤完成後的offset,若是上一次的刷盤位置大於等於NextOffset,就說明從NextOffset位置起始已經被刷新過了,不需要刷新,否則調用mappedFileQueue的flush方法進行刷盤
MappedFileQueue的flush方法:
1 public boolean flush(final int flushLeastPages) { 2 boolean result = true; 3 MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0); 4 if (mappedFile != null) { 5 long tmpTimeStamp = mappedFile.getStoreTimestamp(); 6 int offset = mappedFile.flush(flushLeastPages); 7 long where = mappedFile.getFileFromOffset() + offset; 8 result = where == this.flushedWhere; 9 this.flushedWhere = where; 10 if (0 == flushLeastPages) { 11 this.storeTimestamp = tmpTimeStamp; 12 } 13 } 14 15 return result; 16 }
這裡首先根據flushedWhere上一次刷盤完成後的offset,通過findMappedFileByOffset方法,找到CommitLog文件的映射MappedFile
有關MappedFile及其相關操作在我之前的博客中介紹過很多次,就不再累贅
再找到MappedFile後,調用其flush方法:
MappedFile的flush方法:
1 public int flush(final int flushLeastPages) { 2 if (this.isAbleToFlush(flushLeastPages)) { 3 if (this.hold()) { 4 int value = getReadPosition(); 5 6 try { 7 //We only append data to fileChannel or mappedByteBuffer, never both. 8 if (writeBuffer != null || this.fileChannel.position() != 0) { 9 this.fileChannel.force(false); 10 } else { 11 this.mappedByteBuffer.force(); 12 } 13 } catch (Throwable e) { 14 log.error("Error occurred when force data to disk.", e); 15 } 16 17 this.flushedPosition.set(value); 18 this.release(); 19 } else { 20 log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get()); 21 this.flushedPosition.set(getReadPosition()); 22 } 23 } 24 return this.getFlushedPosition(); 25 }
首先isAbleToFlush方法:
1 private boolean isAbleToFlush(final int flushLeastPages) { 2 int flush = this.flushedPosition.get(); 3 int write = getReadPosition(); 4 5 if (this.isFull()) { 6 return true; 7 } 8 9 if (flushLeastPages > 0) { 10 return ((write / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE)) >= flushLeastPages; 11 } 12 13 return write > flush; 14 }
其中flush記錄的是上一次完成刷新後的位置,write記錄的是當前消息內容寫入後的位置
當flushLeastPages 大於0的時候,通過:
1 return ((write / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE)) >= flushLeastPages;
可以計算出是否滿足page的要求,其中OS_PAGE_SIZE是4K,也就是說1個page大小是4k
由於這裡是同步刷盤,flushLeastPages是0,不對page要求,只要有緩存有內容就會刷盤;但是在非同步刷盤中,flushLeastPages是4,也就是說,只有當緩存的消息至少是4(page個數)*4K(page大小)= 16K時,非同步刷盤才會將緩存寫入文件
回到MappedFile的flush方法,在通過isAbleToFlush檢查完寫入要求後
1 int value = getReadPosition(); 2 try { 3 //We only append data to fileChannel or mappedByteBuffer, never both. 4 if (writeBuffer != null || this.fileChannel.position() != 0) { 5 this.fileChannel.force(false); 6 } else { 7 this.mappedByteBuffer.force(); 8 } 9 } catch (Throwable e) { 10 log.error("Error occurred when force data to disk.", e); 11 } 12 13 this.flushedPosition.set(value);
首先通過getReadPosition獲取當前消息內容寫入後的位置,由於是同步刷盤,所以這裡調用mappedByteBuffer的force方法,通過JDK的NIO操作,將mappedByteBuffer緩存中的數據寫入CommitLog文件中
最後更新flushedPosition的值
再回到MappedFileQueue的flush方法,在完成MappedFile的flush後,還需要更新flushedWhere的值
此時緩存中的數據完成了持久化,同步刷盤結束
非同步刷盤:
①FlushCommitLogService:
1 public void run() { 2 CommitLog.log.info(this.getServiceName() + " service started"); 3 4 while (!this.isStopped()) { 5 boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed(); 6 7 int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog(); 8 int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages(); 9 10 int flushPhysicQueueThoroughInterval = 11 CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval(); 12 13 boolean printFlushProgress = false; 14 15 // Print flush progress 16 long currentTimeMillis = System.currentTimeMillis(); 17 if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) { 18 this.lastFlushTimestamp = currentTimeMillis; 19 flushPhysicQueueLeastPages = 0; 20 printFlushProgress = (printTimes++ % 10) == 0; 21 } 22 23 try { 24 if (flushCommitLogTimed) { 25 Thread.sleep(interval); 26 } else { 27 this.waitForRunning(interval); 28 } 29 30 if (printFlushProgress) { 31 this.printFlushProgress(); 32 } 33 34 long begin = System.currentTimeMillis(); 35 CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages); 36 long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp(); 37 if (storeTimestamp > 0) { 38 CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp); 39 } 40 long past = System.currentTimeMillis() - begin; 41 if (past > 500) { 42 log.info("Flush data to disk costs {} ms", past); 43 } 44 } catch (Throwable e) { 45 CommitLog.log.warn(this.getServiceName() + " service has exception. ", e); 46 this.printFlushProgress(); 47 } 48 } 49 50 // Normal shutdown, to ensure that all the flush before exit 51 boolean result = false; 52 for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) { 53 result = CommitLog.this.mappedFileQueue.flush(0); 54 CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK")); 55 } 56 57 this.printFlushProgress(); 58 59 CommitLog.log.info(this.getServiceName() + " service end"); 60 }
flushCommitLogTimed:是否使用定時刷盤
interval:刷盤時間間隔,預設500ms
flushPhysicQueueLeastPages:page大小,預設4個
flushPhysicQueueThoroughInterval:徹底刷盤時間間隔,預設10s
首先根據lastFlushTimestamp(上一次刷盤時間)+ flushPhysicQueueThoroughInterval和當前時間比較,判斷是否需要進行一次徹底刷盤,若達到了需要則將flushPhysicQueueLeastPages置為0
接著根據flushCommitLogTimed判斷
當flushCommitLogTimed為true,使用sleep等待500ms
當flushCommitLogTimed為false,調用waitForRunning在超時時間為500ms下阻塞,其喚醒條件也就是在handleDiskFlush中的wakeup喚醒
最後,和同步刷盤一樣,調用mappedFileQueue的flush方法
只不過,這裡的flushPhysicQueueLeastPages決定了其是進行徹底刷新,還是按4page(16K)的標準刷新
②CommitRealTimeService
這種刷盤方式需要和FlushCommitLogService配合
CommitRealTimeService的run方法:
1 public void run() { 2 CommitLog.log.info(this.getServiceName() + " service started"); 3 while (!this.isStopped()) { 4 int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog(); 5 6 int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages(); 7 8 int commitDataThoroughInterval = 9 CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval(); 10 11 long begin = System.currentTimeMillis(); 12 if (begin >= (this.lastCommitTimestamp + commitDataThoroughInterval)) { 13 this.lastCommitTimestamp = begin; 14 commitDataLeastPages = 0; 15 } 16 17 try { 18 boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages); 19 long end = System.currentTimeMillis(); 20 if (!result) { 21 this.lastCommitTimestamp = end; // result = false means some data committed. 22 //now wake up flush thread. 23 flushCommitLogService.wakeup(); 24 } 25 26 if (end - begin > 500) { 27 log.info("Commit data to file costs {} ms", end - begin); 28 } 29 this.waitForRunning(interval); 30 } catch (Throwable e) { 31 CommitLog.log.error(this.getServiceName() + " service has exception. ", e); 32 } 33 } 34 35 boolean result = false; 36 for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) { 37 result = CommitLog.this.mappedFileQueue.commit(0); 38 CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK")); 39 } 40 CommitLog.log.info(this.getServiceName() + " service end"); 41 }
這裡的邏輯和FlushCommitLogService中相似,之不過參數略有不同
interval:提交時間間隔,預設200ms
commitDataLeastPages:page大小,預設4個
commitDataThoroughInterval:提交完成時間間隔,預設200ms
基本和FlushCommitLogService相似,只不過調用了mappedFileQueue的commit方法
1 public boolean commit(final int commitLeastPages) { 2 boolean result = true; 3 MappedFile mappedFile = this.findMappedFileByOffset(this.committedWhere, this.committedWhere == 0); 4 if (mappedFile != null) { 5 int offset = mappedFile.commit(commitLeastPages); 6 long where = mappedFile.getFileFromOffset() + offset; 7 result = where == this.committedWhere; 8 this.committedWhere = where; 9 } 10 11 return result; 12 }
這裡和mappedFileQueue的flush方法很相似,通過committedWhere尋找MappedFile
然後調用MappedFile的commit方法:
1 public int commit(final int commitLeastPages) { 2 if (writeBuffer == null) { 3 //no need to commit data to file channel, so just regard wrotePosition as committedPosition. 4 return this.wrotePosition.get(); 5 } 6 if (this.isAbleToCommit(commitLeastPages)) { 7 if (this.hold()) { 8 commit0(commitLeastPages); 9 this.release(); 10 } else { 11 log.warn("in commit, hold failed, commit offset = " + this.committedPosition.get()); 12 } 13 } 14 15 // All dirty data has been committed to FileChannel. 16 if (writeBuffer != null && this.transientStorePool != null && this.fileSize == this.committedPosition.get()) { 17 this.transientStorePool.returnBuffer(writeBuffer); 18 this.writeBuffer = null; 19 } 20 21 return this.committedPosition.get(); 22 }
依舊和MappedFile的flush方法很相似,在isAbleToCommit檢查完page後調用commit0方法
MappedFile的commit0方法:
1 protected void commit0(final int commitLeastPages) { 2 int writePos = this.wrotePosition.get(); 3 int lastCommittedPosition = this.committedPosition.get(); 4 5 if (writePos - this.committedPosition.get() > 0) { 6 try { 7 ByteBuffer byteBuffer = writeBuffer.slice(); 8 byteBuffer.position(lastCommittedPosition); 9 byteBuffer.limit(writePos); 10 this.fileChannel.position(lastCommittedPosition); 11 this.fileChannel.write(byteBuffer); 12 this.committedPosition.set(writePos); 13 } catch (Throwable e) { 14 log.error("Error occurred when commit data to FileChannel.", e); 15 } 16 } 17 }
中說過,當使用這種方式時,會先將消息緩存在writeBuffer中而不是之前的mappedByteBuffer
這裡就可以清楚地看到將writeBuffer中從lastCommittedPosition(上次提交位置)開始到writePos(緩存消息結束位置)的內容緩存到了fileChannel中相同的位置,並沒有寫入磁碟
在緩存到fileChannel後,會更新committedPosition值
回到commit方法,在向fileCfihannel緩存完畢後,會檢查committedPosition是否達到了fileSize,也就是判斷writeBuffer中的內容是不是去全部提交完畢
若是全部提交,需要通過transientStorePool的returnBuffer方法來回收利用writeBuffer
transientStorePool其實是一個雙向隊列,由CommitLog來管理
TransientStorePool:
1 public class TransientStorePool { 2 private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); 3 4 private final int poolSize; 5 private final int fileSize; 6 private final Deque<ByteBuffer> availableBuffers; 7 private final MessageStoreConfig storeConfig; 8 9 public TransientStorePool(final MessageStoreConfig storeConfig) { 10 this.storeConfig = storeConfig; 11 this.poolSize = storeConfig.getTransientStorePoolSize(); 12 this.fileSize = storeConfig.getMapedFileSizeCommitLog(); 13 this.availableBuffers = new ConcurrentLinkedDeque<>(); 14 } 15 ...... 16 }
returnBuffer方法:
1 public void returnBuffer(ByteBuffer byteBuffer) { 2 byteBuffer.position(0); 3 byteBuffer.limit(fileSize); 4 this.availableBuffers.offerFirst(byteBuffer); 5 }
這裡就可以清楚地看到byteBuffer確實被回收了
回到MappedFileQueue的commit方法:
1 public boolean commit(final int commitLeastPages) { 2 boolean result = true; 3 MappedFile mappedFile = this.findMappedFileByOffset(this.committedWhere, this.committedWhere == 0); 4 if (mappedFile != null) { 5