本節介紹記憶體映射文件,利用它,我們實現一個簡單的、持久化的、可跨程式協作的消息隊列,怎麼實現呢? ...
本節介紹記憶體映射文件,記憶體映射文件不是Java引入的概念,而是操作系統提供的一種功能,大部分操作系統都支持。
我們先來介紹記憶體映射文件的基本概念,它是什麼,能解決什麼問題,然後我們介紹如何在Java中使用,我們會設計和實現一個簡單的、持久化的、跨程式的消息隊列來演示記憶體映射文件的應用。
基本概念
所謂記憶體映射文件,就是將文件映射到記憶體,文件對應於記憶體中的一個位元組數組,對文件的操作變為對這個位元組數組的操作,而位元組數組的操作直接映射到文件上。這種映射可以是映射文件全部區域,也可以是只映射一部分區域。
不過,這種映射是操作系統提供的一種假象,文件一般不會馬上載入到記憶體,操作系統只是記錄下了這回事,當實際發生讀寫時,才會按需載入。操作系統一般是按頁載入的,頁可以理解為就是一塊,頁的大小與操作系統和硬體相關,典型的配置可能是4K, 8K等,當操作系統發現讀寫區域不在記憶體時,就會載入該區域對應的一個頁到記憶體。
這種按需載入的方式,使得記憶體映射文件可以方便處理非常大的文件,記憶體放不下整個文件也不要緊,操作系統會自動進行處理,將需要的內容讀到記憶體,將修改的內容保存到硬碟,將不再使用的記憶體釋放。
在應用程式寫的時候,它寫的是記憶體中的位元組數組,這個內容什麼時候同步到文件上呢?這個時機是不確定的,由操作系統決定,不過,只要操作系統不崩潰,操作系統會保證同步到文件上,即使映射這個文件的應用程式已經退出了。
在一般的文件讀寫中,會有兩次數據拷貝,一次是從硬碟拷貝到操作系統內核,另一次是從操作系統內核拷貝到用戶態的應用程式。而在記憶體映射文件中,一般情況下,只有一次拷貝,且記憶體分配在操作系統內核,應用程式訪問的就是操作系統的內核記憶體空間,這顯然要比普通的讀寫效率更高。
記憶體映射文件的另一個重要特點是,它可以被多個不同的應用程式共用,多個程式可以映射同一個文件,映射到同一塊記憶體區域,一個程式對記憶體的修改,可以讓其他程式也看到,這使得它特別適合用於不同應用程式之間的通信。
操作系統自身在載入可執行文件的時候,一般都利用了記憶體映射文件,比如:
- 按需載入代碼,只有當前運行的代碼在記憶體,其他暫時用不到的代碼還在硬碟
- 同時啟動多次同一個可執行文件,文件代碼在記憶體也只有一份
- 不同應用程式共用的動態鏈接庫代碼在記憶體也只有一份
記憶體映射文件也有局限性,比如,它不太適合處理小文件,它是按頁分配記憶體的,對於小文件,會浪費空間,另外,映射文件要消耗一定的操作系統資源,初始化比較慢。
簡單總結下,對於一般的文件讀寫不需要使用記憶體映射文件,但如果處理的是大文件,要求極高的讀寫效率,比如資料庫系統,或者需要在不同程式間進行共用和通信,那就可以考慮記憶體映射文件。
理解了記憶體映射文件的基本概念,接下來,我們看怎麼在Java中使用它。
用法
映射文件
記憶體映射文件需要通過FileInputStream/FileOutputStream或RandomAccessFile,它們都有一個方法:
public FileChannel getChannel()
FileChannel有如下方法:
public MappedByteBuffer map(MapMode mode, long position, long size) throws IOException
map方法將當前文件映射到記憶體,映射的結果就是一個MappedByteBuffer對象,它代表記憶體中的位元組數組,待會我們再來詳細看它。map有三個參數,mode表示映射模式,positon表示映射的起始位置,size表示長度。
mode有三個取值:
- MapMode.READ_ONLY:只讀
- MapMode.READ_WRITE:既讀也寫
- MapMode.PRIVATE:私有模式,更改不反映到文件,也不被其他程式看到
這個模式受限於背後的流或RandomAccessFile,比如,對於FileInputStream,或者RandomAccessFile但打開模式是"r",那mode就不能設為MapMode.READ_WRITE,否則會拋出異常。
如果映射的區域超過了現有文件的範圍,則文件會自動擴展,擴展出的區域位元組內容為0。
映射完成後,文件就可以關閉了,後續對文件的讀寫可以通過MappedByteBuffer。
看段代碼,比如以讀寫模式映射文件"abc.dat",代碼可以為:
RandomAccessFile file = new RandomAccessFile("abc.dat","rw"); try { MappedByteBuffer buf = file.getChannel().map(MapMode.READ_WRITE, 0, file.length()); //使用buf... } catch (IOException e) { e.printStackTrace(); }finally{ file.close(); }
MappedByteBuffer
怎麼來使用MappedByteBuffer呢?它是ByteBuffer的子類,而ByteBuffer是Buffer的子類。ByteBuffer和Buffer不只是給記憶體映射文件提供的,它們是Java NIO中操作數據的一種方式,用於很多地方,方法也比較多,我們只介紹一些主要相關的。
ByteBuffer可以簡單理解為就是封裝了一個位元組數組,這個位元組數組的長度是不可變的,在記憶體映射文件中,這個長度由map方法中的參數size決定。
ByteBuffer有一個基本屬性position,表示當前讀寫位置,這個位置可以改變,相關方法是:
//獲取當前讀寫位置 public final int position() //修改當前讀寫位置 public final Buffer position(int newPosition)
ByteBuffer中有很多基於當前位置讀寫數據的方法,如:
//從當前位置獲取一個位元組 public abstract byte get(); //從當前位置拷貝dst.length長度的位元組到dst public ByteBuffer get(byte[] dst) //從當前位置讀取一個int public abstract int getInt(); //從當前位置讀取一個double public abstract double getDouble(); //將位元組數組src寫入當前位置 public final ByteBuffer put(byte[] src) //將long類型的value寫入當前位置 public abstract ByteBuffer putLong(long value);
這些方法在讀寫後,都會自動增加position。
與這些方法相對應的,還有一組方法,可以在參數中直接指定position,比如:
//從index處讀取一個int public abstract int getInt(int index); //從index處讀取一個double public abstract double getDouble(int index); //在index處寫入一個double public abstract ByteBuffer putDouble(int index, double value); //在index處寫入一個long public abstract ByteBuffer putLong(int index, long value);
這些方法在讀寫時,不會改變當前讀寫位置position。
MappedByteBuffer自己還定義了一些方法:
//檢查文件內容是否真實載入到了記憶體,這個值是一個參考值,不一定精確 public final boolean isLoaded() //儘量將文件內容載入到記憶體 public final MappedByteBuffer load() //將對記憶體的修改強制同步到硬碟上 public final MappedByteBuffer force()
消息隊列
瞭解了記憶體映射文件的用法,接下來,我們來看怎麼用它設計和實現一個簡單的消息隊列,我們稱之為BasicQueue。
功能
BasicQueue是一個先進先出的迴圈隊列,長度固定,介面主要是出隊和入隊,與之前介紹的容器類的區別是:
- 消息持久化保存在文件中,重啟程式消息不會丟失
- 可以供不同的程式進行協作,典型場景是,有兩個不同的程式,一個是生產者,另一個是消費者,生成者只將消息放入隊列,而消費者只從隊列中取消息,兩個程式通過隊列進行協作,這種協作方式更靈活,相互依賴性小,是一種常見的協作方式。
BasicQueue的構造方法是:
public BasicQueue(String path, String queueName) throws IOException
path表示隊列所在的目錄,必須已存在,queueName表示隊列名,BasicQueue會使用以queueName開頭的兩個文件來保存隊列信息,一個尾碼是.data,保存實際的消息,另一個尾碼是.meta,保存元數據信息,如果這兩個文件存在,則會使用已有的隊列,否則會建立新隊列。
BasicQueue主要提供兩個方法,出隊和入隊,如下所示:
//入隊 public void enqueue(byte[] data) throws IOException //出隊 public byte[] dequeue() throws IOException
與上節介紹的BasicDB類似,消息格式也是byte數組。BasicQueue的隊列長度是有限的,如果滿了,調用enqueue會拋出異常,消息的最大長度也是有限的,不能超過1020,如果超了,也會拋出異常。如果隊列為空,dequeue返回null。
用法示例
BasicQueue的典型用法是生產者和消費者之間的協作,我們來看下簡單的示例代碼。生產者程式向隊列上放消息,每放一條,就隨機休息一會兒,代碼為:
public class Producer { public static void main(String[] args) throws InterruptedException { try { BasicQueue queue = new BasicQueue("./", "task"); int i = 0; Random rnd = new Random(); while (true) { String msg = new String("task " + (i++)); queue.enqueue(msg.getBytes("UTF-8")); System.out.println("produce: " + msg); Thread.sleep(rnd.nextInt(1000)); } } catch (IOException e) { e.printStackTrace(); } } }
消費者程式從隊列中取消息,如果隊列為空,也隨機睡一會兒,代碼為:
public class Consumer { public static void main(String[] args) throws InterruptedException { try { BasicQueue queue = new BasicQueue("./", "task"); Random rnd = new Random(); while (true) { byte[] bytes = queue.dequeue(); if (bytes == null) { Thread.sleep(rnd.nextInt(1000)); continue; } System.out.println("consume: " + new String(bytes, "UTF-8")); } } catch (IOException e) { e.printStackTrace(); } } }
假定這兩個程式的當前目錄一樣,它們會使用同樣的隊列"task"。同時運行這兩個程式,會看到它們的輸出交替出現。
設計
我們採用如下簡單方式來設計BasicQueue:
- 使用兩個文件來保存消息隊列,一個為數據文件,尾碼為.data,一個是元數據文件.meta。
- 在.data文件中使用固定長度存儲每條信息,長度為1024,前4個位元組為實際長度,後面是實際內容,每條消息的最大長度不能超過1020。
- 在.meta文件中保存隊列頭和尾,指向.data文件中的位置,初始都是0,入隊增加尾,出隊增加頭,到結尾時,再從0開始,模擬迴圈隊列。
- 為了區分隊列滿和空的狀態,始終留一個位置不保存數據,當隊列頭和尾一樣的時候表示隊列為空,當隊列尾的下一個位置是隊列頭的時候表示隊列滿。
基本設計如下圖所示:
為簡化起見,我們暫不考慮由於併發訪問等引起的一致性問題。
實現消息隊列
下麵來看BasicQueue的具體實現代碼。
常量定義
BasicQueue中定義瞭如下常量,名稱和含義如下:
// 隊列最多消息個數,實際個數還會減1 private static final int MAX_MSG_NUM = 1020*1024; // 消息體最大長度 private static final int MAX_MSG_BODY_SIZE = 1020; // 每條消息占用的空間 private static final int MSG_SIZE = MAX_MSG_BODY_SIZE + 4; // 隊列消息體數據文件大小 private static final int DATA_FILE_SIZE = MAX_MSG_NUM * MSG_SIZE; // 隊列元數據文件大小 (head + tail) private static final int META_SIZE = 8;
內部組成
BasicQueue的內部成員主要就是兩個MappedByteBuffer,分別表示數據和元數據:
private MappedByteBuffer dataBuf; private MappedByteBuffer metaBuf;
構造方法
BasicQueue的構造方法代碼是:
public BasicQueue(String path, String queueName) throws IOException { if (path.endsWith(File.separator)) { path += File.separator; } RandomAccessFile dataFile = null; RandomAccessFile metaFile = null; try { dataFile = new RandomAccessFile(path + queueName + ".data", "rw"); metaFile = new RandomAccessFile(path + queueName + ".meta", "rw"); dataBuf = dataFile.getChannel().map(MapMode.READ_WRITE, 0, DATA_FILE_SIZE); metaBuf = metaFile.getChannel().map(MapMode.READ_WRITE, 0, META_SIZE); } finally { if (dataFile != null) { dataFile.close(); } if (metaFile != null) { metaFile.close(); } } }
輔助方法
為了方便訪問和修改隊列頭尾指針,我們有如下方法:
private int head() { return metaBuf.getInt(0); } private void head(int newHead) { metaBuf.putInt(0, newHead); } private int tail() { return metaBuf.getInt(4); } private void tail(int newTail) { metaBuf.putInt(4, newTail); }
為了便於判斷隊列是空還是滿,我們有如下方法:
private boolean isEmpty(){ return head() == tail(); } private boolean isFull(){ return ((tail() + MSG_SIZE) % DATA_FILE_SIZE) == head(); }
入隊
代碼為:
public void enqueue(byte[] data) throws IOException { if (data.length > MAX_MSG_BODY_SIZE) { throw new IllegalArgumentException("msg size is " + data.length + ", while maximum allowed length is " + MAX_MSG_BODY_SIZE); } if (isFull()) { throw new IllegalStateException("queue is full"); } int tail = tail(); dataBuf.position(tail); dataBuf.putInt(data.length); dataBuf.put(data); if (tail + MSG_SIZE >= DATA_FILE_SIZE) { tail(0); } else { tail(tail + MSG_SIZE); } }
基本邏輯是:
- 如果消息太長或隊列滿,拋出異常。
- 找到隊列尾,定位到隊列尾,寫消息長度,寫實際數據。
- 更新隊列尾指針,如果已到文件尾,再從頭開始。
出隊
代碼為:
public byte[] dequeue() throws IOException { if (isEmpty()) { return null; } int head = head(); dataBuf.position(head); int length = dataBuf.getInt(); byte[] data = new byte[length]; dataBuf.get(data); if (head + MSG_SIZE >= DATA_FILE_SIZE) { head(0); } else { head(head + MSG_SIZE); } return data; }
基本邏輯是:
- 如果隊列為空,返回null。
- 找到隊列頭,定位到隊列頭,讀消息長度,讀實際數據。
- 更新隊列頭指針,如果已到文件尾,再從頭開始。
- 最後返回實際數據
小結
本節介紹了記憶體映射文件的基本概念及在Java中的的用法,在日常普通的文件讀寫中,我們用到的比較少,但在一些系統程式中,它卻是經常被用到的一把利器,可以高效的讀寫大文件,且能實現不同程式間的共用和通信。
利用記憶體映射文件,我們設計和實現了一個簡單的消息隊列,消息可以持久化,可以實現跨程式的生產者/消費者通信,我們演示了這個消息隊列的功能、用法、設計和實現代碼。
前面幾節,我們多次提到過序列化的概念,它到底是什麼呢?
----------------
未完待續,查看最新文章,敬請關註微信公眾號“老馬說編程”(掃描下方二維碼),從入門到高級,深入淺出,老馬和你一起探索Java編程及電腦技術的本質。用心原創,保留所有版權。