RocketMQ中Broker的刷盤源碼分析

来源:https://www.cnblogs.com/a526583280/archive/2019/08/07/11312750.html
-Advertisement-
Play Games

上一篇博客的最後簡單提了下CommitLog的刷盤 【RocketMQ中Broker的消息存儲源碼分析】 (這篇博客和上一篇有很大的聯繫) Broker的CommitLog刷盤會啟動一個線程,不停地將緩衝區的內容寫入磁碟(CommitLog文件)中,主要分為非同步刷盤和同步刷盤 非同步刷盤又可以分為兩種 ...


上一篇博客的最後簡單提了下CommitLog的刷盤  【RocketMQ中Broker的消息存儲源碼分析】 (這篇博客和上一篇有很大的聯繫)


Broker的CommitLog刷盤會啟動一個線程,不停地將緩衝區的內容寫入磁碟(CommitLog文件)中,主要分為非同步刷盤和同步刷盤

非同步刷盤又可以分為兩種方式:
①緩存到mappedByteBuffer -> 寫入磁碟(包括同步刷盤)
②緩存到writeBuffer -> 緩存到fileChannel -> 寫入磁碟 (前面說過的開啟記憶體位元組緩衝區情況下)

 

CommitLog的兩種刷盤模式:

1 public enum FlushDiskType {
2     SYNC_FLUSH,
3     ASYNC_FLUSH
4 }

同步和非同步,同步刷盤由GroupCommitService實現,非同步刷盤由FlushRealTimeService實現,預設採用非同步刷盤

在採用非同步刷盤的模式下,若是開啟記憶體位元組緩衝區,那麼會在FlushRealTimeService的基礎上開啟CommitRealTimeService

 

同步刷盤:

啟動GroupCommitService線程:

 1 public void run() {
 2     CommitLog.log.info(this.getServiceName() + " service started");
 3 
 4     while (!this.isStopped()) {
 5         try {
 6             this.waitForRunning(10);
 7             this.doCommit();
 8         } catch (Exception e) {
 9             CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
10         }
11     }
12 
13     // Under normal circumstances shutdown, wait for the arrival of the
14     // request, and then flush
15     try {
16         Thread.sleep(10);
17     } catch (InterruptedException e) {
18         CommitLog.log.warn("GroupCommitService Exception, ", e);
19     }
20 
21     synchronized (this) {
22         this.swapRequests();
23     }
24 
25     this.doCommit();
26 
27     CommitLog.log.info(this.getServiceName() + " service end");
28 }

通過迴圈中的doCommit不斷地進行刷盤


doCommit方法:

 1 private void doCommit() {
 2     synchronized (this.requestsRead) {
 3         if (!this.requestsRead.isEmpty()) {
 4             for (GroupCommitRequest req : this.requestsRead) {
 5                 // There may be a message in the next file, so a maximum of
 6                 // two times the flush
 7                 boolean flushOK = false;
 8                 for (int i = 0; i < 2 && !flushOK; i++) {
 9                     flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
10 
11                     if (!flushOK) {
12                         CommitLog.this.mappedFileQueue.flush(0);
13                     }
14                 }
15 
16                 req.wakeupCustomer(flushOK);
17             }
18 
19             long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
20             if (storeTimestamp > 0) {
21                 CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
22             }
23 
24             this.requestsRead.clear();
25         } else {
26             // Because of individual messages is set to not sync flush, it
27             // will come to this process
28             CommitLog.this.mappedFileQueue.flush(0);
29         }
30     }
31 }

其中在GroupCommitService中管理著兩張List:

1 private volatile List<GroupCommitRequest> requestsWrite = new ArrayList<GroupCommitRequest>();
2 private volatile List<GroupCommitRequest> requestsRead = new ArrayList<GroupCommitRequest>();

GroupCommitRequest中封裝了一個Offset

1 private final long nextOffset;

 


這裡就需要看到上一篇博客結尾提到的handleDiskFlush方法:

 1 public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
 2     // Synchronization flush
 3     if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
 4         final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
 5         if (messageExt.isWaitStoreMsgOK()) {
 6             GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
 7             service.putRequest(request);
 8             boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
 9             if (!flushOK) {
10                 log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()
11                     + " client address: " + messageExt.getBornHostString());
12                 putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
13             }
14         } else {
15             service.wakeup();
16         }
17     }
18     // Asynchronous flush
19     else {
20         if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
21             flushCommitLogService.wakeup();
22         } else {
23             commitLogService.wakeup();
24         }
25     }
26 }

這個方法的調用發生在Broker接收到來自Producer的消息,並且完成了向ByteBuffer的寫入

可以看到,在同步刷盤SYNC_FLUSH模式下,會從AppendMessageResult 中取出WroteOffset以及WroteBytes從而計算出nextOffset,把這個nextOffset封裝到GroupCommitRequest中,然後通過GroupCommitService 的putRequest方法,將GroupCommitRequest添加到requestsWrite這個List中
putRequest方法:

1 public synchronized void putRequest(final GroupCommitRequest request) {
2     synchronized (this.requestsWrite) {
3         this.requestsWrite.add(request);
4     }
5     if (hasNotified.compareAndSet(false, true)) {
6         waitPoint.countDown(); // notify
7     }
8 }

在完成List的add操作後,會通過CAS操作修改hasNotified這個原子化的Boolean值,同時通過waitPoint的countDown進行喚醒操作,在後面會有用


由於這裡這裡是同步刷盤,所以需要通過GroupCommitRequest的waitForFlush方法,在超時時間內等待該記錄對應的刷盤完成
而非同步刷盤會通過wakeup方法喚醒刷盤任務,並沒有進行等待,這就是二者區別


回到doCommit方法中,這時會發現這裡是對requestsRead這條List進行的操作,而剛纔是將記錄存放在requestsWrite這條List中的
這就和在run方法中的waitForRunning方法有關了:

 1 protected void waitForRunning(long interval) {
 2    if (hasNotified.compareAndSet(true, false)) {
 3         this.onWaitEnd();
 4         return;
 5     }
 6 
 7     //entry to wait
 8     waitPoint.reset();
 9 
10     try {
11         waitPoint.await(interval, TimeUnit.MILLISECONDS);
12     } catch (InterruptedException e) {
13         log.error("Interrupted", e);
14     } finally {
15         hasNotified.set(false);
16         this.onWaitEnd();
17     }
18 }

這裡通過CAS操作修改hasNotified值,從而調用onWaitEnd方法;如果修改失敗,則因為await進入阻塞,等待上面所說的putRequest方法將其喚醒,也就是說當Producer發送的消息被緩存成功後,調用handleDiskFlush方法後,喚醒刷盤線工作,當然刷盤線程在達到超時時間interval後也會喚醒


再來看看onWaitEnd方法:

1 protected void onWaitEnd() {
2     this.swapRequests();
3 }
4 
5 private void swapRequests() {
6     List<GroupCommitRequest> tmp = this.requestsWrite;
7     this.requestsWrite = this.requestsRead;
8     this.requestsRead = tmp;
9 }

可以看到,這裡是將兩個List進行了交換

這是一個非常有趣的做法,如果熟悉JVM的話,有沒有覺得這其實很像新生代的複製演算法!
當刷盤線程阻塞的時候,requestsWrite中會填充記錄,當刷盤線程被喚醒工作的時候,首先會將requestsWrite和requestsRead進行交換,那麼此時的記錄就是從requestsRead中讀取的了,而同時requestsWrite會變為空的List,消息記錄就會往這個空的List中填充,如此往複

可以看到doCommit方法中,當requestsRead不為空的時候,在最後會調用requestsRead的clear方法,由此證明瞭我上面的說法

 

仔細來看看是如何進行刷盤的:

 1 for (GroupCommitRequest req : this.requestsRead) {
 2    // There may be a message in the next file, so a maximum of
 3     // two times the flush
 4     boolean flushOK = false;
 5     for (int i = 0; i < 2 && !flushOK; i++) {
 6         flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
 7 
 8         if (!flushOK) {
 9             CommitLog.this.mappedFileQueue.flush(0);
10         }
11     }
12 
13     req.wakeupCustomer(flushOK);
14 }

通過遍歷requestsRead,可以到得到GroupCommitRequest封裝的NextOffset

其中flushedWhere是用來記錄上一次刷盤完成後的offset,若是上一次的刷盤位置大於等於NextOffset,就說明從NextOffset位置起始已經被刷新過了,不需要刷新,否則調用mappedFileQueue的flush方法進行刷盤

MappedFileQueue的flush方法:

 1 public boolean flush(final int flushLeastPages) {
 2     boolean result = true;
 3     MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0);
 4     if (mappedFile != null) {
 5         long tmpTimeStamp = mappedFile.getStoreTimestamp();
 6         int offset = mappedFile.flush(flushLeastPages);
 7         long where = mappedFile.getFileFromOffset() + offset;
 8         result = where == this.flushedWhere;
 9         this.flushedWhere = where;
10         if (0 == flushLeastPages) {
11             this.storeTimestamp = tmpTimeStamp;
12         }
13     }
14 
15     return result;
16 }

這裡首先根據flushedWhere上一次刷盤完成後的offset,通過findMappedFileByOffset方法,找到CommitLog文件的映射MappedFile
有關MappedFile及其相關操作在我之前的博客中介紹過很多次,就不再累贅


再找到MappedFile後,調用其flush方法:

MappedFile的flush方法:

 1 public int flush(final int flushLeastPages) {
 2     if (this.isAbleToFlush(flushLeastPages)) {
 3         if (this.hold()) {
 4             int value = getReadPosition();
 5 
 6             try {
 7                 //We only append data to fileChannel or mappedByteBuffer, never both.
 8                 if (writeBuffer != null || this.fileChannel.position() != 0) {
 9                     this.fileChannel.force(false);
10                 } else {
11                     this.mappedByteBuffer.force();
12                 }
13             } catch (Throwable e) {
14                 log.error("Error occurred when force data to disk.", e);
15             }
16 
17             this.flushedPosition.set(value);
18             this.release();
19         } else {
20             log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get());
21             this.flushedPosition.set(getReadPosition());
22         }
23     }
24     return this.getFlushedPosition();
25 }


首先isAbleToFlush方法:

 1 private boolean isAbleToFlush(final int flushLeastPages) {
 2     int flush = this.flushedPosition.get();
 3     int write = getReadPosition();
 4 
 5     if (this.isFull()) {
 6         return true;
 7     }
 8 
 9     if (flushLeastPages > 0) {
10         return ((write / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE)) >= flushLeastPages;
11     }
12 
13     return write > flush;
14 }

其中flush記錄的是上一次完成刷新後的位置,write記錄的是當前消息內容寫入後的位置
當flushLeastPages 大於0的時候,通過:

1 return ((write / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE)) >= flushLeastPages;

可以計算出是否滿足page的要求,其中OS_PAGE_SIZE是4K,也就是說1個page大小是4k

由於這裡是同步刷盤,flushLeastPages是0,不對page要求,只要有緩存有內容就會刷盤;但是在非同步刷盤中,flushLeastPages是4,也就是說,只有當緩存的消息至少是4(page個數)*4K(page大小)= 16K時,非同步刷盤才會將緩存寫入文件

 

回到MappedFile的flush方法,在通過isAbleToFlush檢查完寫入要求後

 1 int value = getReadPosition();
 2 try {
 3     //We only append data to fileChannel or mappedByteBuffer, never both.
 4     if (writeBuffer != null || this.fileChannel.position() != 0) {
 5         this.fileChannel.force(false);
 6     } else {
 7         this.mappedByteBuffer.force();
 8     }
 9 } catch (Throwable e) {
10     log.error("Error occurred when force data to disk.", e);
11 }
12 
13 this.flushedPosition.set(value);

首先通過getReadPosition獲取當前消息內容寫入後的位置,由於是同步刷盤,所以這裡調用mappedByteBuffer的force方法,通過JDK的NIO操作,將mappedByteBuffer緩存中的數據寫入CommitLog文件中
最後更新flushedPosition的值


再回到MappedFileQueue的flush方法,在完成MappedFile的flush後,還需要更新flushedWhere的值

此時緩存中的數據完成了持久化,同步刷盤結束

 

非同步刷盤:

①FlushCommitLogService:

 1 public void run() {
 2     CommitLog.log.info(this.getServiceName() + " service started");
 3 
 4     while (!this.isStopped()) {
 5         boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed();
 6 
 7         int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog();
 8         int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages();
 9 
10         int flushPhysicQueueThoroughInterval =
11             CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval();
12 
13         boolean printFlushProgress = false;
14 
15         // Print flush progress
16         long currentTimeMillis = System.currentTimeMillis();
17         if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) {
18             this.lastFlushTimestamp = currentTimeMillis;
19             flushPhysicQueueLeastPages = 0;
20             printFlushProgress = (printTimes++ % 10) == 0;
21         }
22 
23         try {
24             if (flushCommitLogTimed) {
25                 Thread.sleep(interval);
26             } else {
27                 this.waitForRunning(interval);
28             }
29 
30             if (printFlushProgress) {
31                 this.printFlushProgress();
32             }
33 
34             long begin = System.currentTimeMillis();
35             CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);
36             long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
37             if (storeTimestamp > 0) {
38                 CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
39             }
40             long past = System.currentTimeMillis() - begin;
41             if (past > 500) {
42                 log.info("Flush data to disk costs {} ms", past);
43             }
44         } catch (Throwable e) {
45             CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
46             this.printFlushProgress();
47         }
48     }
49 
50     // Normal shutdown, to ensure that all the flush before exit
51     boolean result = false;
52     for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
53         result = CommitLog.this.mappedFileQueue.flush(0);
54         CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
55     }
56 
57     this.printFlushProgress();
58 
59     CommitLog.log.info(this.getServiceName() + " service end");
60 }

flushCommitLogTimed:是否使用定時刷盤
interval:刷盤時間間隔,預設500ms
flushPhysicQueueLeastPages:page大小,預設4個
flushPhysicQueueThoroughInterval:徹底刷盤時間間隔,預設10s


首先根據lastFlushTimestamp(上一次刷盤時間)+ flushPhysicQueueThoroughInterval和當前時間比較,判斷是否需要進行一次徹底刷盤,若達到了需要則將flushPhysicQueueLeastPages置為0


接著根據flushCommitLogTimed判斷
當flushCommitLogTimed為true,使用sleep等待500ms
當flushCommitLogTimed為false,調用waitForRunning在超時時間為500ms下阻塞,其喚醒條件也就是在handleDiskFlush中的wakeup喚醒

最後,和同步刷盤一樣,調用mappedFileQueue的flush方法
只不過,這裡的flushPhysicQueueLeastPages決定了其是進行徹底刷新,還是按4page(16K)的標準刷新


②CommitRealTimeService
這種刷盤方式需要和FlushCommitLogService配合


CommitRealTimeService的run方法:

 1 public void run() {
 2    CommitLog.log.info(this.getServiceName() + " service started");
 3     while (!this.isStopped()) {
 4         int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog();
 5 
 6         int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages();
 7 
 8         int commitDataThoroughInterval =
 9             CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval();
10 
11         long begin = System.currentTimeMillis();
12         if (begin >= (this.lastCommitTimestamp + commitDataThoroughInterval)) {
13             this.lastCommitTimestamp = begin;
14             commitDataLeastPages = 0;
15         }
16 
17         try {
18             boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages);
19             long end = System.currentTimeMillis();
20             if (!result) {
21                 this.lastCommitTimestamp = end; // result = false means some data committed.
22                 //now wake up flush thread.
23                 flushCommitLogService.wakeup();
24             }
25 
26             if (end - begin > 500) {
27                 log.info("Commit data to file costs {} ms", end - begin);
28             }
29             this.waitForRunning(interval);
30         } catch (Throwable e) {
31             CommitLog.log.error(this.getServiceName() + " service has exception. ", e);
32         }
33     }
34 
35     boolean result = false;
36     for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
37         result = CommitLog.this.mappedFileQueue.commit(0);
38         CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
39     }
40     CommitLog.log.info(this.getServiceName() + " service end");
41 }

這裡的邏輯和FlushCommitLogService中相似,之不過參數略有不同

interval:提交時間間隔,預設200ms
commitDataLeastPages:page大小,預設4個
commitDataThoroughInterval:提交完成時間間隔,預設200ms

基本和FlushCommitLogService相似,只不過調用了mappedFileQueue的commit方法

 1 public boolean commit(final int commitLeastPages) {
 2     boolean result = true;
 3     MappedFile mappedFile = this.findMappedFileByOffset(this.committedWhere, this.committedWhere == 0);
 4     if (mappedFile != null) {
 5         int offset = mappedFile.commit(commitLeastPages);
 6         long where = mappedFile.getFileFromOffset() + offset;
 7         result = where == this.committedWhere;
 8         this.committedWhere = where;
 9     }
10 
11     return result;
12 }

這裡和mappedFileQueue的flush方法很相似,通過committedWhere尋找MappedFile

然後調用MappedFile的commit方法:

 1 public int commit(final int commitLeastPages) {
 2     if (writeBuffer == null) {
 3         //no need to commit data to file channel, so just regard wrotePosition as committedPosition.
 4         return this.wrotePosition.get();
 5     }
 6     if (this.isAbleToCommit(commitLeastPages)) {
 7         if (this.hold()) {
 8             commit0(commitLeastPages);
 9             this.release();
10         } else {
11             log.warn("in commit, hold failed, commit offset = " + this.committedPosition.get());
12         }
13     }
14 
15     // All dirty data has been committed to FileChannel.
16     if (writeBuffer != null && this.transientStorePool != null && this.fileSize == this.committedPosition.get()) {
17         this.transientStorePool.returnBuffer(writeBuffer);
18         this.writeBuffer = null;
19     }
20 
21     return this.committedPosition.get();
22 }

依舊和MappedFile的flush方法很相似,在isAbleToCommit檢查完page後調用commit0方法


MappedFile的commit0方法:

 1 protected void commit0(final int commitLeastPages) {
 2     int writePos = this.wrotePosition.get();
 3     int lastCommittedPosition = this.committedPosition.get();
 4 
 5     if (writePos - this.committedPosition.get() > 0) {
 6         try {
 7             ByteBuffer byteBuffer = writeBuffer.slice();
 8             byteBuffer.position(lastCommittedPosition);
 9             byteBuffer.limit(writePos);
10             this.fileChannel.position(lastCommittedPosition);
11             this.fileChannel.write(byteBuffer);
12             this.committedPosition.set(writePos);
13         } catch (Throwable e) {
14             log.error("Error occurred when commit data to FileChannel.", e);
15         }
16     }
17 }

 【RocketMQ中Broker的消息存儲源碼分析】 

中說過,當使用這種方式時,會先將消息緩存在writeBuffer中而不是之前的mappedByteBuffer
這裡就可以清楚地看到將writeBuffer中從lastCommittedPosition(上次提交位置)開始到writePos(緩存消息結束位置)的內容緩存到了fileChannel中相同的位置,並沒有寫入磁碟
在緩存到fileChannel後,會更新committedPosition值


回到commit方法,在向fileCfihannel緩存完畢後,會檢查committedPosition是否達到了fileSize,也就是判斷writeBuffer中的內容是不是去全部提交完畢

若是全部提交,需要通過transientStorePool的returnBuffer方法來回收利用writeBuffer
transientStorePool其實是一個雙向隊列,由CommitLog來管理
TransientStorePool:

 1 public class TransientStorePool {
 2     private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
 3 
 4     private final int poolSize;
 5     private final int fileSize;
 6     private final Deque<ByteBuffer> availableBuffers;
 7     private final MessageStoreConfig storeConfig;
 8 
 9     public TransientStorePool(final MessageStoreConfig storeConfig) {
10         this.storeConfig = storeConfig;
11         this.poolSize = storeConfig.getTransientStorePoolSize();
12         this.fileSize = storeConfig.getMapedFileSizeCommitLog();
13         this.availableBuffers = new ConcurrentLinkedDeque<>();
14     }
15     ......
16 }


returnBuffer方法:

1 public void returnBuffer(ByteBuffer byteBuffer) {
2     byteBuffer.position(0);
3     byteBuffer.limit(fileSize);
4     this.availableBuffers.offerFirst(byteBuffer);
5 }

這裡就可以清楚地看到byteBuffer確實被回收了

 

回到MappedFileQueue的commit方法:

 1 public boolean commit(final int commitLeastPages) {
 2     boolean result = true;
 3     MappedFile mappedFile = this.findMappedFileByOffset(this.committedWhere, this.committedWhere == 0);
 4     if (mappedFile != null) {
 5         	   

您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 前言 現在,很少有人和90年代一樣,自己去實現一個軟體的各個方面,也就是說,在工作中,和人溝通是必備的技能。那麼,作為一枚碼農,如何和他人溝通呢?這就要依靠本文的主題了——UML。 簡介 UML——Unified modeling language UML(統一建模語言),是一種用於軟體系統分析和設 ...
  • 迪米特原則定義 迪米特原則,也叫最少知道原則,即一個類應該 對自己依賴的類知道的越少越好 ,而你被依賴的類多麼複雜,對我都沒有關係。也就是說,對於別依賴的類來說,不管業務邏輯多麼複雜,都應該儘量封裝在類的內部;對外除了必備的public方法,不再泄露任何信息。 1.問題由來 我們知道,類和類是有耦合 ...
  • 最近同事在處理文件導入的時候需要把一批文件換成CSV的格式,但是直覺修改尾碼是不生效的,而且xlsx和xls的文件沒法直接換成CVS的文件,所以找了一下方式,並且自己實現了python的轉換方式。代碼如下 文件需要導入pandas 還要引入xlrd 代碼是基於python3.6的環境。 ...
  • 實例解讀面向對象核心,所有例子基於 C#,涉及我們實務中最常關心的問題: 1、封裝、繼承、多態; 2、抽象類、介面; 3、委托、事件。 ...
  • 前幾天和隔壁鄰居玩鬥地主被髮現了,牌被沒收了,鬥地主是鬥不了了,但我還想和鄰居玩耍。 想破腦袋終於讓我想到一個游戲,數獨!什麼叫數獨?數獨就是可以讓我趁老王不在的時候和隔壁鄰居一起玩耍的游戲! ...
  • 土壤濕度感測器和dh11會獲取數據,然後樹莓派會處理這些數據,讀出土壤溫濕度和空氣溫濕度,並將這些數據上傳到雲伺服器, 雲伺服器會將這些數據顯示在網頁上,同時樹莓派會根據這些數據來決定是否控制繼電器進而控制水泵來澆水,當然,水泵是否澆水也可以通過事先做的網頁看到。 ...
  • https://www.datamentor.io/r-programming/data-frame/ Check if a variable is a data frame or not We can check if a variable is a data frame or not using ...
  • 通過實現 Aware 介面,可以在 Spring 啟動時,調用介面定義的方法,將 Spring 底層的一些組件註入到自定義的 Bean 中。 下麵列出了幾個 Spring 在 Aware 介面基礎上,進行擴展的介面,分別會在創建 Bean 時直接執行,或者通過 BeanPostProcessor 間 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...