一、說明 之前項目中一直使用ConcurrentLinkedQueue做為緩衝隊列(主要是單個項目內,單條改批量的場景,多個項目間使用的是rocketmq),雖然用著方便但是是純記憶體的, 如果項目發生異常崩潰記憶體隊列中的數據就會全部丟失(只能從日誌中恢復)。所以一直想找一個簡單高效支持持久化的嵌入式 ...
一、說明
之前項目中一直使用ConcurrentLinkedQueue做為緩衝隊列(主要是單個項目內,單條改批量的場景,多個項目間使用的是rocketmq),雖然用著方便但是是純記憶體的,
如果項目發生異常崩潰記憶體隊列中的數據就會全部丟失(只能從日誌中恢復)。所以一直想找一個簡單高效支持持久化的嵌入式消息隊列。中間用過activemq的嵌入模式,
雖然是支持持久化了,但是配置起來很繁瑣,用起來也不簡單,性能相比來說也不太行。
後來偶然發現了FQueue,項目地址:https://github.com/tietang/fqueue
看了看項目源碼,純java編寫,總共沒幾個類。完全可以改造成我想要的 簡單高效支持持久化的嵌入式消息隊列。
二、改造
1、因為是要做成嵌入式的,所以memcached協議相關的代碼都刪除了。
2、預創建文件刪除了,還有一些零零碎碎的改動。(好幾年了,記不清了)。
3、相較於原代碼,改動最大的就是鎖的部分,FQueue 讀和寫使用的是同一把鎖,
我改成了讀和寫使用不同的鎖,只在文件切換的時候使用同一把鎖。性能大概提示了百分之20左右(本來就很快,錦上添花)。
4、添加了記憶體隊列,這個主要解決同一個機器創建了大量隊列(上千)時,隊列消息消費較快,因為使用了記憶體映射磁碟(每隔10ms就會調用force()同步磁碟),
頻繁操作磁碟導致磁碟io過高的問題。預設情況下隊列大小超過50時才會寫入持久化隊列。可以在項目啟動時調用SMQ.setting(String dbPath, int logSize, int memoryQueueSize)
進行設置。
三、使用
1、說明
目前是集成在我個人的工具類項目中的,已發佈到中央倉庫。項目地址:https://github.com/shenbururen/sun-utils
該項目強依賴hutool,算是個人對hutool的個性化的擴展。如果不想依賴該項目,只想單純的使用SMQ,可以將源碼中 cn.sanenen.queue包複製出來,單獨使用。
2、maven引用
<!-- https://mvnrepository.com/artifact/cn.sanenen/sun-utils --> <dependency> <groupId>cn.sanenen</groupId> <artifactId>sun-utils</artifactId> <version>2.3.0</version> </dependency>
3、調用
SMQ使用時只有三個方法,向隊列放入數據、從隊列取出數據、獲取隊列大小(一般只在監控隊列是否積壓時使用,判斷隊列是否有數據,使用獲取隊列數據是否為null進行判斷)。
我一般是寫一個單獨的類,通過靜態方法調用。
/** * 本地持久化記憶體隊列 */ public class MsgQueue { private static final String testDataTopic = "testData"; /** * 向隊列放入數據,支持多線程。 */ public static void putTestData(TestData msg) { SMQ.push(testDataTopic, JSON.toJSONString(msg)); } /** * 從隊列取出數據,支持多線程。 */ public static TestData getTestData() { String poll = SMQ.pop(testDataTopic); if (StrUtil.isNotBlank(poll)) { return JSON.parseObject(poll, TestData.class); } return null; } /** * 獲取隊列大小 */ public static long getTestDataSize(){ return SMQ.size(testDataTopic); } }
四、註意事項
1、預設會在項目目錄下生成一個smq的文件夾用來存放隊列數據。同一個smq的文件夾同時只可被一個項目使用。
2、SMQ.setting(String dbPath, int logSize, int memoryQueueSize)
dbPath文件存儲目錄,預設是smq,會在項目目錄下創建一個smq的目錄。(還沒測試過絕對路徑)。
logSize屬性只可以在項目最開始時設置,之後不可以再設置不同的值。(也可以將生成的smq文件夾刪除後重新啟動進行設置)。
memoryQueueSize是記憶體隊列大小,預設是50,隊列數據積壓超過memoryQueueSize後才會寫入持久化隊列。(目前memoryQueueSize為0時還是會創建記憶體隊列,這裡之後會優化,不影響使用。)
項目使用了addShutdownHook,會在項目關閉時將記憶體隊列消息寫入持久化隊列。結束項目時使用kill -15 不要用 -9。否則可能造成消息丟失。
建議都使用預設的,也就是不要調用這個方法。避免調用出現問題。
3、最好使用在不是要求百分百消息不丟失的場景。(在項目異常停止、伺服器停電關機時,有概率丟失消息。)
4、目前已經使用兩年多。