電腦程式的思維邏輯 (61) - 記憶體映射文件及其應用 - 實現一個簡單的消息隊列

来源:http://www.cnblogs.com/swiftma/archive/2017/01/03/6227209.html
-Advertisement-
Play Games

本節介紹記憶體映射文件,利用它,我們實現一個簡單的、持久化的、可跨程式協作的消息隊列,怎麼實現呢? ...


本節介紹記憶體映射文件,記憶體映射文件不是Java引入的概念,而是操作系統提供的一種功能,大部分操作系統都支持。

我們先來介紹記憶體映射文件的基本概念,它是什麼,能解決什麼問題,然後我們介紹如何在Java中使用,我們會設計和實現一個簡單的、持久化的、跨程式的消息隊列來演示記憶體映射文件的應用。

基本概念

所謂記憶體映射文件,就是將文件映射到記憶體,文件對應於記憶體中的一個位元組數組,對文件的操作變為對這個位元組數組的操作,而位元組數組的操作直接映射到文件上。這種映射可以是映射文件全部區域,也可以是只映射一部分區域。

不過,這種映射是操作系統提供的一種假象,文件一般不會馬上載入到記憶體,操作系統只是記錄下了這回事,當實際發生讀寫時,才會按需載入。操作系統一般是按頁載入的,頁可以理解為就是一塊,頁的大小與操作系統和硬體相關,典型的配置可能是4K, 8K等,當操作系統發現讀寫區域不在記憶體時,就會載入該區域對應的一個頁到記憶體。

這種按需載入的方式,使得記憶體映射文件可以方便處理非常大的文件,記憶體放不下整個文件也不要緊,操作系統會自動進行處理,將需要的內容讀到記憶體,將修改的內容保存到硬碟,將不再使用的記憶體釋放。

在應用程式寫的時候,它寫的是記憶體中的位元組數組,這個內容什麼時候同步到文件上呢?這個時機是不確定的,由操作系統決定,不過,只要操作系統不崩潰,操作系統會保證同步到文件上,即使映射這個文件的應用程式已經退出了。

在一般的文件讀寫中,會有兩次數據拷貝,一次是從硬碟拷貝到操作系統內核,另一次是從操作系統內核拷貝到用戶態的應用程式。而在記憶體映射文件中,一般情況下,只有一次拷貝,且記憶體分配在操作系統內核,應用程式訪問的就是操作系統的內核記憶體空間,這顯然要比普通的讀寫效率更高。

記憶體映射文件的另一個重要特點是,它可以被多個不同的應用程式共用,多個程式可以映射同一個文件,映射到同一塊記憶體區域,一個程式對記憶體的修改,可以讓其他程式也看到,這使得它特別適合用於不同應用程式之間的通信。

操作系統自身在載入可執行文件的時候,一般都利用了記憶體映射文件,比如:

  • 按需載入代碼,只有當前運行的代碼在記憶體,其他暫時用不到的代碼還在硬碟
  • 同時啟動多次同一個可執行文件,文件代碼在記憶體也只有一份
  • 不同應用程式共用的動態鏈接庫代碼在記憶體也只有一份 

記憶體映射文件也有局限性,比如,它不太適合處理小文件,它是按頁分配記憶體的,對於小文件,會浪費空間,另外,映射文件要消耗一定的操作系統資源,初始化比較慢。

簡單總結下,對於一般的文件讀寫不需要使用記憶體映射文件,但如果處理的是大文件,要求極高的讀寫效率,比如資料庫系統,或者需要在不同程式間進行共用和通信,那就可以考慮記憶體映射文件。

理解了記憶體映射文件的基本概念,接下來,我們看怎麼在Java中使用它。

用法

映射文件

記憶體映射文件需要通過FileInputStream/FileOutputStream或RandomAccessFile,它們都有一個方法:

public FileChannel getChannel()

FileChannel有如下方法:

public MappedByteBuffer map(MapMode mode, long position, long size) throws IOException

map方法將當前文件映射到記憶體,映射的結果就是一個MappedByteBuffer對象,它代表記憶體中的位元組數組,待會我們再來詳細看它。map有三個參數,mode表示映射模式,positon表示映射的起始位置,size表示長度。

mode有三個取值:

  • MapMode.READ_ONLY:只讀
  • MapMode.READ_WRITE:既讀也寫
  • MapMode.PRIVATE:私有模式,更改不反映到文件,也不被其他程式看到 

這個模式受限於背後的流或RandomAccessFile,比如,對於FileInputStream,或者RandomAccessFile但打開模式是"r",那mode就不能設為MapMode.READ_WRITE,否則會拋出異常。

如果映射的區域超過了現有文件的範圍,則文件會自動擴展,擴展出的區域位元組內容為0。

映射完成後,文件就可以關閉了,後續對文件的讀寫可以通過MappedByteBuffer。

看段代碼,比如以讀寫模式映射文件"abc.dat",代碼可以為:

RandomAccessFile file = new RandomAccessFile("abc.dat","rw");
try {
    MappedByteBuffer buf = file.getChannel().map(MapMode.READ_WRITE, 0, file.length());
    //使用buf...
} catch (IOException e) {
    e.printStackTrace();
}finally{
    file.close();
}

MappedByteBuffer

怎麼來使用MappedByteBuffer呢?它是ByteBuffer的子類,而ByteBuffer是Buffer的子類。ByteBuffer和Buffer不只是給記憶體映射文件提供的,它們是Java NIO中操作數據的一種方式,用於很多地方,方法也比較多,我們只介紹一些主要相關的。

ByteBuffer可以簡單理解為就是封裝了一個位元組數組,這個位元組數組的長度是不可變的,在記憶體映射文件中,這個長度由map方法中的參數size決定。

ByteBuffer有一個基本屬性position,表示當前讀寫位置,這個位置可以改變,相關方法是:

//獲取當前讀寫位置
public final int position()
//修改當前讀寫位置
public final Buffer position(int newPosition)

ByteBuffer中有很多基於當前位置讀寫數據的方法,如:

//從當前位置獲取一個位元組
public abstract byte get();
//從當前位置拷貝dst.length長度的位元組到dst
public ByteBuffer get(byte[] dst)
//從當前位置讀取一個int
public abstract int getInt();
//從當前位置讀取一個double
public abstract double getDouble();
//將位元組數組src寫入當前位置
public final ByteBuffer put(byte[] src)
//將long類型的value寫入當前位置
public abstract ByteBuffer putLong(long value);

這些方法在讀寫後,都會自動增加position。

與這些方法相對應的,還有一組方法,可以在參數中直接指定position,比如:

//從index處讀取一個int
public abstract int getInt(int index);
//從index處讀取一個double
public abstract double getDouble(int index);
//在index處寫入一個double
public abstract ByteBuffer putDouble(int index, double value);
//在index處寫入一個long
public abstract ByteBuffer putLong(int index, long value);

這些方法在讀寫時,不會改變當前讀寫位置position。

MappedByteBuffer自己還定義了一些方法:

//檢查文件內容是否真實載入到了記憶體,這個值是一個參考值,不一定精確
public final boolean isLoaded()
//儘量將文件內容載入到記憶體
public final MappedByteBuffer load()
//將對記憶體的修改強制同步到硬碟上
public final MappedByteBuffer force()

消息隊列

瞭解了記憶體映射文件的用法,接下來,我們來看怎麼用它設計和實現一個簡單的消息隊列,我們稱之為BasicQueue。

功能

BasicQueue是一個先進先出的迴圈隊列,長度固定,介面主要是出隊和入隊,與之前介紹的容器類的區別是:

  • 消息持久化保存在文件中,重啟程式消息不會丟失
  • 可以供不同的程式進行協作,典型場景是,有兩個不同的程式,一個是生產者,另一個是消費者,生成者只將消息放入隊列,而消費者只從隊列中取消息,兩個程式通過隊列進行協作,這種協作方式更靈活,相互依賴性小,是一種常見的協作方式。

BasicQueue的構造方法是:

public BasicQueue(String path, String queueName) throws IOException

path表示隊列所在的目錄,必須已存在,queueName表示隊列名,BasicQueue會使用以queueName開頭的兩個文件來保存隊列信息,一個尾碼是.data,保存實際的消息,另一個尾碼是.meta,保存元數據信息,如果這兩個文件存在,則會使用已有的隊列,否則會建立新隊列。

BasicQueue主要提供兩個方法,出隊和入隊,如下所示:

//入隊
public void enqueue(byte[] data) throws IOException
//出隊
public byte[] dequeue() throws IOException

與上節介紹的BasicDB類似,消息格式也是byte數組。BasicQueue的隊列長度是有限的,如果滿了,調用enqueue會拋出異常,消息的最大長度也是有限的,不能超過1020,如果超了,也會拋出異常。如果隊列為空,dequeue返回null。

用法示例

BasicQueue的典型用法是生產者和消費者之間的協作,我們來看下簡單的示例代碼。生產者程式向隊列上放消息,每放一條,就隨機休息一會兒,代碼為:

public class Producer {
    public static void main(String[] args) throws InterruptedException {
        try {
            BasicQueue queue = new BasicQueue("./", "task");
            int i = 0;
            Random rnd = new Random();
            while (true) {
                String msg = new String("task " + (i++));
                queue.enqueue(msg.getBytes("UTF-8"));
                System.out.println("produce: " + msg);
                Thread.sleep(rnd.nextInt(1000));
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

消費者程式從隊列中取消息,如果隊列為空,也隨機睡一會兒,代碼為:

public class Consumer {
    public static void main(String[] args) throws InterruptedException {
        try {
            BasicQueue queue = new BasicQueue("./", "task");
            Random rnd = new Random();
            while (true) {
                byte[] bytes = queue.dequeue();
                if (bytes == null) {
                    Thread.sleep(rnd.nextInt(1000));
                    continue;
                }
                System.out.println("consume: " + new String(bytes, "UTF-8"));
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

假定這兩個程式的當前目錄一樣,它們會使用同樣的隊列"task"。同時運行這兩個程式,會看到它們的輸出交替出現。

設計

我們採用如下簡單方式來設計BasicQueue:

  • 使用兩個文件來保存消息隊列,一個為數據文件,尾碼為.data,一個是元數據文件.meta。
  • 在.data文件中使用固定長度存儲每條信息,長度為1024,前4個位元組為實際長度,後面是實際內容,每條消息的最大長度不能超過1020。
  • 在.meta文件中保存隊列頭和尾,指向.data文件中的位置,初始都是0,入隊增加尾,出隊增加頭,到結尾時,再從0開始,模擬迴圈隊列。
  • 為了區分隊列滿和空的狀態,始終留一個位置不保存數據,當隊列頭和尾一樣的時候表示隊列為空,當隊列尾的下一個位置是隊列頭的時候表示隊列滿。 

基本設計如下圖所示:

為簡化起見,我們暫不考慮由於併發訪問等引起的一致性問題。

實現消息隊列

下麵來看BasicQueue的具體實現代碼。

常量定義

BasicQueue中定義瞭如下常量,名稱和含義如下:

// 隊列最多消息個數,實際個數還會減1
private static final int MAX_MSG_NUM = 1020*1024;
// 消息體最大長度
private static final int MAX_MSG_BODY_SIZE = 1020;
// 每條消息占用的空間
private static final int MSG_SIZE = MAX_MSG_BODY_SIZE + 4;
// 隊列消息體數據文件大小
private static final int DATA_FILE_SIZE = MAX_MSG_NUM * MSG_SIZE;
// 隊列元數據文件大小 (head + tail)
private static final int META_SIZE = 8;

內部組成

BasicQueue的內部成員主要就是兩個MappedByteBuffer,分別表示數據和元數據:

private MappedByteBuffer dataBuf;
private MappedByteBuffer metaBuf; 

構造方法

BasicQueue的構造方法代碼是:

public BasicQueue(String path, String queueName) throws IOException {
    if (path.endsWith(File.separator)) {
        path += File.separator;
    }
    RandomAccessFile dataFile = null;
    RandomAccessFile metaFile = null;
    try {
        dataFile = new RandomAccessFile(path + queueName + ".data", "rw");
        metaFile = new RandomAccessFile(path + queueName + ".meta", "rw");

        dataBuf = dataFile.getChannel().map(MapMode.READ_WRITE, 0,
                DATA_FILE_SIZE);
        metaBuf = metaFile.getChannel().map(MapMode.READ_WRITE, 0,
                META_SIZE);
    } finally {
        if (dataFile != null) {
            dataFile.close();
        }
        if (metaFile != null) {
            metaFile.close();
        }
    }
}

輔助方法

為了方便訪問和修改隊列頭尾指針,我們有如下方法:

private int head() {
    return metaBuf.getInt(0);
}

private void head(int newHead) {
    metaBuf.putInt(0, newHead);
}

private int tail() {
    return metaBuf.getInt(4);
}

private void tail(int newTail) {
    metaBuf.putInt(4, newTail);
}

為了便於判斷隊列是空還是滿,我們有如下方法:

private boolean isEmpty(){
    return head() == tail();
}

private boolean isFull(){
    return ((tail() + MSG_SIZE) % DATA_FILE_SIZE) == head();
}

入隊

代碼為:

public void enqueue(byte[] data) throws IOException {
    if (data.length > MAX_MSG_BODY_SIZE) {
        throw new IllegalArgumentException("msg size is " + data.length
                + ", while maximum allowed length is " + MAX_MSG_BODY_SIZE);
    }
    if (isFull()) {
        throw new IllegalStateException("queue is full");
    }
    int tail = tail();
    dataBuf.position(tail);
    dataBuf.putInt(data.length);
    dataBuf.put(data);

    if (tail + MSG_SIZE >= DATA_FILE_SIZE) {
        tail(0);
    } else {
        tail(tail + MSG_SIZE);
    }
}

基本邏輯是:

  1. 如果消息太長或隊列滿,拋出異常。
  2. 找到隊列尾,定位到隊列尾,寫消息長度,寫實際數據。
  3. 更新隊列尾指針,如果已到文件尾,再從頭開始。 

出隊

代碼為:

public byte[] dequeue() throws IOException {
    if (isEmpty()) {
        return null;
    }
    int head = head();
    dataBuf.position(head);
    int length = dataBuf.getInt();
    byte[] data = new byte[length];
    dataBuf.get(data);

    if (head + MSG_SIZE >= DATA_FILE_SIZE) {
        head(0);
    } else {
        head(head + MSG_SIZE);
    }
    return data;
}

基本邏輯是:

  1. 如果隊列為空,返回null。
  2. 找到隊列頭,定位到隊列頭,讀消息長度,讀實際數據。
  3. 更新隊列頭指針,如果已到文件尾,再從頭開始。
  4. 最後返回實際數據 

小結

本節介紹了記憶體映射文件的基本概念及在Java中的的用法,在日常普通的文件讀寫中,我們用到的比較少,但在一些系統程式中,它卻是經常被用到的一把利器,可以高效的讀寫大文件,且能實現不同程式間的共用和通信。

利用記憶體映射文件,我們設計和實現了一個簡單的消息隊列,消息可以持久化,可以實現跨程式的生產者/消費者通信,我們演示了這個消息隊列的功能、用法、設計和實現代碼。

前面幾節,我們多次提到過序列化的概念,它到底是什麼呢?

----------------

未完待續,查看最新文章,敬請關註微信公眾號“老馬說編程”(掃描下方二維碼),從入門到高級,深入淺出,老馬和你一起探索Java編程及電腦技術的本質。用心原創,保留所有版權。


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 在IIS上安裝Kooboo CMS Kooboo CMS安裝之後 安裝的常見問題 1. 在IIS上安裝Kooboo CMS Kooboo CMS部署到正式環境相當簡單,安裝過程是一個普通MVC站點在IIS上的架設過程,沒有額外的配置步驟。首先從 "這裡" 下載最新版本的Kooboo CMS安裝包以備 ...
  • 反射 System.Reflection 命名空間下的類與 System.Type 命名空間使你能夠獲取有關載入的程式集和其中定義的類型的有關信息,如類、介面和值類型等。 可以使用反射在運行時創建、調用和訪問類型實例。 CLR 程式管理應用程式域,應用程式域構成具有相同應用程式範圍的對象周圍定義的邊 ...
  • 不知道在博客園寫過的文章是否可以更改,特此測試!!! ...
  • jdbc連接資料庫的四個對象 DriverManager 驅動類 DriverManager.registerDriver(new com.mysql.jdbc.Driver());不建議使用 原因有2個: > 導致驅動被註冊2次。 > 強烈依賴資料庫的驅動jar 解決辦法: Class.forNa ...
  • 似乎隔三差五就能看到一些關於架構師應不應該寫代碼的文章。我是屬於寫代碼派,因為我本身就喜歡寫代碼。但是,當工作職責發生變化之後,如何保持寫代碼和其它工作之間的平衡就成了問題。 從個體效率上來看,我自己親自寫代碼,和很多人相比沒有什麼絕對優勢,甚至有些人碼代碼的速度比我還快一些。 但作為架構師,參與寫 ...
  • 一、變數的概念: (1)記憶體中的一個存儲區域 (2)該區域有自己的名稱(變數名)和類型(數據類型) (3)Java中每個變數必須先聲明,後使用 (4)該區域的數據可以在同一類型範圍內不斷變化 使用變數註意: (1)變數的作用域:一對{ }之間有效 (2)初始化值 定義變數的格式:數據類型 變數名 = ...
  • 一、Java 集合框架 集合框架是一個用來代表和操縱集合的統一架構。所有的集合框架都包含如下內容: 介面:是代表集合的抽象數據類型。介面允許集合獨立操縱其代表的細節。在面向對象的語言,介面通常形成一個層次。 實現(類):是集合介面的具體實現。從本質上講,它們是可重覆使用的數據結構。 演算法:是實現集合 ...
  • 20161230問題解析請點擊今日問題下方的“【Java每日一題】20170103”查看(問題解析在公眾號首發,公眾號ID:weknow619) 今日問題: 請問主程式輸出結果是什麼?(點擊以下“【Java每日一題】20170103”查看20161230問題解析) 題目原發佈於公眾號、簡書:【Jav ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...