PostgreSQL非同步客戶端(並模擬redis 數據結構)

来源:http://www.cnblogs.com/irons/archive/2016/04/08/5369796.html
-Advertisement-
Play Games

以前為了不在游戲邏輯(對象屬性)變更時修改資料庫,就弄了個varchar欄位來表示json,由伺服器邏輯(讀取到記憶體)去操作它。 但這對運維相當不友好,也不能做一些此Json數據里查詢。 所以後面就用了下ssdb,然而就在前幾天才瞭解到postgresql支持json了(其實早在兩年前就行了吧··· ...


以前為了不在游戲邏輯(對象屬性)變更時修改資料庫,就弄了個varchar欄位來表示json,由伺服器邏輯(讀取到記憶體)去操作它。

但這對運維相當不友好,也不能做一些此Json數據里查詢。

 

所以後面就用了下ssdb,然而就在前幾天才瞭解到postgresql支持json了(其實早在兩年前就行了吧···)

就這點差不多就可以算當作mongodb用了,不過還是不支持redis的高級數據結構。

 

於是我就想模擬(實現)下redis(的數據結構)。

就抽空看了下它的c api庫:libpq,發現其請求-等待模型,在網路延遲高的時候,特別影響qps。所以我就寫了一個非同步客戶端,並簡易模擬了redis的kv,hash。

開8個鏈接到pg server,其速度比1個鏈接快5倍。 在我的測試中,每秒打到30k QPS

(目前不支持list,以及後期還要通過儲存過程對現在的hash實現進行改造優化)

 

#include <string>
#include <list>
#include <iostream>
#include <unordered_map>
#include <memory>
#include <queue>
#include <assert.h>
#include <functional>
#include <sstream>
#include <chrono>

#include "fdset.h"

#include "libpq-events.h"
#include "libpq-fe.h"
#include "libpq/libpq-fs.h"

using namespace std;

class AsyncPGClient
{
public:
    /*TODO::傳遞錯誤信息*/
    typedef std::function<void(const PGresult*)> RESULT_CALLBACK;
    typedef std::function<void(bool value)> BOOL_RESULT_CALLBACK;
    typedef std::function<void(const string& value)> STRING_RESULT_CALLBACK;
    typedef std::function<void(const std::unordered_map<string, string>& value)> STRINGMAP_RESULT_CALLBACK;

    AsyncPGClient() : mKVTableName("kv_data"), mHashTableName("hashmap_data")
    {
        mfdset = ox_fdset_new();
    }

    ~AsyncPGClient()
    {
        for (auto& kv : mConnections)
        {
            PQfinish((*kv.second).pgconn);
        }

        ox_fdset_delete(mfdset);
        mfdset = nullptr;
    }

    void    get(const string& key, const STRING_RESULT_CALLBACK& callback = nullptr)
    {
        mStringStream << "SELECT key, value FROM public." << mKVTableName << " where key = '" << key << "';";

        postQuery(mStringStream.str(), [callback](const PGresult* result){
            if (callback != nullptr && result != nullptr)
            {
                if (PQntuples(result) == 1 && PQnfields(result) == 2)
                {
                    callback(PQgetvalue(result, 0, 1));
                }
            }
        });
    }

    void    set(const string& key, const string& v, const BOOL_RESULT_CALLBACK& callback = nullptr)
    {
        mStringStream << "INSERT INTO public." << mKVTableName << "(key, value) VALUES('" << key << "', '" << v << "') ON CONFLICT(key) DO UPDATE SET value = EXCLUDED.value;";

        postQuery(mStringStream.str(), [callback](const PGresult* result){
            if (callback != nullptr)
            {
                if (PQresultStatus(result) == PGRES_COMMAND_OK)
                {
                    callback(true);
                }
                else
                {
                    cout << PQresultErrorMessage(result);
                    callback(false);
                }
            }
        });
    }

    void    hget(const string& hashname, const string& key, const STRING_RESULT_CALLBACK& callback = nullptr)
    {
        hmget(hashname, { key }, [callback](const std::unordered_map<string, string>& value){
            if (callback != nullptr && !value.empty())
            {
                callback((*value.begin()).second);
            }
        });
    }

    void    hmget(const string& hashname, const std::vector<string>& keys, const STRINGMAP_RESULT_CALLBACK& callback = nullptr)
    {
        mStringStream << "SELECT key, value FROM public." << mHashTableName << " where ";
        auto it = keys.begin();
        do
        {
            mStringStream << "key='" << (*it) << "'";

            ++it;
        } while (it != keys.end() && &(mStringStream << " or ") != nullptr);
        mStringStream << ";";

        postQuery(mStringStream.str(), [callback](const PGresult* result){
            if (callback != nullptr)
            {
                std::unordered_map<string, string> ret;
                if (PQresultStatus(result) == PGRES_TUPLES_OK)
                {
                    int num = PQntuples(result);
                    int fileds = PQnfields(result);
                    if (fileds == 2)
                    {
                        for (int i = 0; i < num; i++)
                        {
                            ret[PQgetvalue(result, i, 0)] = PQgetvalue(result, i, 1);
                        }
                    }
                }

                callback(ret);
            }
        });
    }

    void    hset(const string& hashname, const string& key, const string& value, const BOOL_RESULT_CALLBACK& callback = nullptr)
    {
        mStringStream << "INSERT INTO public." << mHashTableName << "(hashname, key, value) VALUES('" << hashname << "', '" << key << "', '" << value
            << "') ON CONFLICT (hashname, key) DO UPDATE SET value = EXCLUDED.value;";

        postQuery(mStringStream.str(), [callback](const PGresult* result){
            if (callback != nullptr)
            {
                callback(PQresultStatus(result) == PGRES_COMMAND_OK);
            }
        });
    }

    void  hgetall(const string& hashname, const STRINGMAP_RESULT_CALLBACK& callback = nullptr)
    {
        mStringStream << "SELECT key, value FROM public." << mHashTableName << " where hashname = '" << hashname << "';";
        postQuery(mStringStream.str(), [callback](const PGresult* result){
            if (callback != nullptr)
            {
                std::unordered_map<string, string> ret;
                if (PQresultStatus(result) == PGRES_TUPLES_OK)
                {
                    int num = PQntuples(result);
                    int fileds = PQnfields(result);
                    if (fileds == 2)
                    {
                        for (int i = 0; i < num; i++)
                        {
                            ret[PQgetvalue(result, i, 0)] = PQgetvalue(result, i, 1);
                        }
                    }
                }

                callback(ret);
            }
        });
    }

    void    postQuery(const string&& query, const RESULT_CALLBACK& callback = nullptr)
    {
        mPendingQuery.push({ std::move(query), callback});
        mStringStream.str(std::string());
        mStringStream.clear();
    }

    void    postQuery(const string& query, const RESULT_CALLBACK& callback = nullptr)
    {
        mPendingQuery.push({ query, callback });
        mStringStream.str(std::string());
        mStringStream.clear();
    }

public:
    void    poll(int millSecond)
    {
        ox_fdset_poll(mfdset, millSecond);

        std::vector<int> closeFds;

        for (auto& it : mConnections)
        {
            auto fd = it.first;
            auto connection = it.second;
            auto pgconn = connection->pgconn;

            if (ox_fdset_check(mfdset, fd, ReadCheck))
            {
                if (PQconsumeInput(pgconn) > 0 && PQisBusy(pgconn) == 0)
                {
                    bool successGetResult = false;

                    while (true)
                    {
                        auto result = PQgetResult(pgconn);
                        if (result != nullptr)
                        {
                            successGetResult = true;
                            if (connection->callback != nullptr)
                            {
                                connection->callback(result);
                                connection->callback = nullptr;
                            }
                            PQclear(result);
                        }
                        else
                        {
                            break;
                        }
                    }

                    if (successGetResult)
                    {
                        mIdleConnections.push_back(connection);
                    }
                }

                if (PQstatus(pgconn) == CONNECTION_BAD)
                {
                    closeFds.push_back(fd);
                }
            }

            if (ox_fdset_check(mfdset, fd, WriteCheck))
            {
                if (PQflush(pgconn) == 0)
                {
                    //移除可寫檢測
                    ox_fdset_del(mfdset, fd, WriteCheck);
                }
            }
        }

        for (auto& v : closeFds)
        {
            removeConnection(v);
        }
    }

    void    trySendPendingQuery()
    {
        while (!mPendingQuery.empty() && !mIdleConnections.empty())
        {
            auto& query = mPendingQuery.front();
            auto& connection = mIdleConnections.front();

            if (PQsendQuery(connection->pgconn, query.request.c_str()) == 0)
            {
                cout << PQerrorMessage(connection->pgconn) << endl;
                if (query.callback != nullptr)
                {
                    query.callback(nullptr);
                }
            }
            else
            {
                ox_fdset_add(mfdset, PQsocket(connection->pgconn), WriteCheck);
                connection->callback = query.callback;
            }

            mPendingQuery.pop();
            mIdleConnections.pop_front();
        }
    }

    size_t  pendingQueryNum() const
    {
        return mPendingQuery.size();
    }

    size_t  getWorkingQuery() const
    {
        return mConnections.size() - mIdleConnections.size();
    }

    void    createConnection(  const char *pghost, const char *pgport,
                        const char *pgoptions, const char *pgtty,
                        const char *dbName, const char *login, const char *pwd,
                        int num)
    {
        for (int i = 0; i < num; i++)
        {
            auto pgconn = PQsetdbLogin(pghost, pgport, pgoptions, pgtty, dbName, login, pwd);
            if (PQstatus(pgconn) == CONNECTION_OK)
            {
                auto connection = std::make_shared<Connection>(pgconn, nullptr);
                mConnections[PQsocket(pgconn)] = connection;
                PQsetnonblocking(pgconn, 1);
                ox_fdset_add(mfdset, PQsocket(pgconn), ReadCheck);
                mIdleConnections.push_back(connection);
            }
            else
            {
                cout << PQerrorMessage(pgconn);
                PQfinish(pgconn);
                pgconn = nullptr;
            }
        }

        if (!mConnections.empty())
        {
            sCreateTable((*mConnections.begin()).second->pgconn, mKVTableName, mHashTableName);
        }
    }

private:
    void    removeConnection(int fd)
    {
        auto it = mConnections.find(fd);
        if (it != mConnections.end())
        {
            auto connection = (*it).second;
            for (auto it = mIdleConnections.begin(); it != mIdleConnections.end(); ++it)
            {
                if ((*it)->pgconn == connection->pgconn)
                {
                    mIdleConnections.erase(it);
                    break;
                }
            }

            ox_fdset_del(mfdset, fd, ReadCheck | WriteCheck);
            PQfinish(connection->pgconn);
            mConnections.erase(fd);
        }
    }

private:
    static  void    sCreateTable(PGconn* conn, const string& kvTableName, const string& hashTableName)
    {
        {
            string query = "CREATE TABLE public.";
            query += kvTableName;
            query += "(key character varying NOT NULL, value json, CONSTRAINT key PRIMARY KEY(key))";
            PGresult* exeResult = PQexec(conn, query.c_str());
            auto status = PQresultStatus(exeResult);
            auto errorStr = PQresultErrorMessage(exeResult);
            PQclear(exeResult);
        }

        {
            string query = "CREATE TABLE public.";
            query += hashTableName;
            query += "(hashname character varying, key character varying, value json, "
                    "CONSTRAINT hk PRIMARY KEY (hashname, key))";
            PGresult* exeResult = PQexec(conn, query.c_str());
            auto status = PQresultStatus(exeResult);
            auto errorStr = PQresultErrorMessage(exeResult);
            PQclear(exeResult);
        }
    }

private:
    struct QueryAndCallback
    {
        std::string request;
        RESULT_CALLBACK  callback;
    };

    struct Connection
    {
        PGconn* pgconn;
        RESULT_CALLBACK callback;

        Connection(PGconn* p, RESULT_CALLBACK c)
        {
            pgconn = p;
            callback = c;
        }
    };

    const string                                    mKVTableName;
    const string                                    mHashTableName;

    stringstream                                    mStringStream;
    fdset_s*                                        mfdset;

    std::unordered_map<int, shared_ptr<Connection>> mConnections;
    std::list<shared_ptr<Connection>>               mIdleConnections;

    std::queue<QueryAndCallback>                    mPendingQuery;

    /*TODO::監聽wakeup支持*/
    /*TODO::考慮固定分配connection給某業務*/

    /*TODO::編寫儲存過程,替換現有的hashtable模擬方式,如迴圈使用jsonb_set以及 select value->k1, value->k2 from ...*/
    /*TODO::編寫儲存過程,實現list*/
};

int main()
{
    using std::chrono::system_clock;

    AsyncPGClient asyncClient;
    asyncClient.createConnection("192.168.12.1", "5432", nullptr, nullptr, "postgres", "postgres", "19870323", 8);
    system_clock::time_point startTime = system_clock::now();

    auto nowTime = time(NULL);
    
    for (int i = 0; i < 100000; i++)
    {
        if(false)
        {
            string test = "INSERT INTO public.kv_data(key, value) VALUES ('";
            test += std::to_string(nowTime*1000+i);
            test += "', '{\"hp\":100000}') ON CONFLICT (key) DO UPDATE SET value = EXCLUDED.value;";

            asyncClient.postQuery(test);
        }
        else
        {
            asyncClient.postQuery("select * from public.kv_data where key='dd';");
        }
    }

    asyncClient.postQuery("INSERT INTO public.kv_data(key, value) VALUES ('dodo5', '{\"hp\":100000}') "
        " ON CONFLICT (key) DO UPDATE SET value = EXCLUDED.value", [](const PGresult* result){
        cout << "fuck" << endl;
    });

    asyncClient.get("dd", [](const string& value){
        cout << "get dd : " << value << endl;
    });

    asyncClient.set("dd", "{\"hp\":456}", [](bool isOK){
        cout << "set dd : " << isOK << endl;
    });

    asyncClient.hget("heros:dodo", "hp", [](const string& value){
        cout << "hget heros:dodo:" << value << endl;
    });

    asyncClient.hset("heros:dodo", "hp", "{\"hp\":1}", [](bool isOK){
        cout << "hset heros:dodo:" << isOK << endl;
    });

    asyncClient.hmget("heros:dodo", { "hp", "money" }, [](const unordered_map<string, string>& kvs){
        cout << "hmget:" << endl;
        for (auto& kv : kvs)
        {
            cout << kv.first << " : " << kv.second << endl;
        }
    });

    asyncClient.hgetall("heros:dodo", [](const unordered_map<string, string>& kvs){
        cout << "hgetall:" << endl;
        for (auto& kv : kvs)
        {
            cout << kv.first << " : " << kv.second << endl;
        }
    });

    while (true)
    {
        asyncClient.poll(1);
        asyncClient.trySendPendingQuery();
        if (asyncClient.pendingQueryNum() == 0 && asyncClient.getWorkingQuery() == 0)
        {
            break;
        }
    }

    auto elapsed = system_clock::now() - startTime;
    cout << "cost :" << chrono::duration<double>(elapsed).count() << "s" << endl;
    cout << "enter any key exit" << endl;
    cin.get();
    return 0;
}

 

代碼地址:https://github.com/IronsDu/accumulation-dev/blob/master/examples/Pgedis.cpp


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 先上效果圖如下: 1、本公式自定義配置計算器的實現基於DataTable.Compute()的用法,該函數用法詳細參考文檔;本示例支持條件公式、計算公式的配置 2、界面樣式 3、界面腳本 4、界面HTML源碼 5、後臺代碼 6、擴展 在此基礎上可以進一步發揮的空間很大 》結合存儲設計如資料庫等,參數 ...
  • 在網頁中,我們經常需要引用大量的javascript和css文件,在加上許多javascript庫都包含debug版和經過壓縮的release版(比如jquery),不僅麻煩還很容易引起混亂,所以ASP.NET MVC4引入了Bundles特性,使得我們可以方便的管理javascript和css文件 ...
  • 新公司,剛來幾天,閑著沒事,領導讓我做些無關痛癢的活,優化報表統計!!!之前是用flash做的,現在要改成echart實現。好吧,之前沒用過,抱著學習態度,研究了下。寫點東西打發下時間,能幫到需要幫助朋友更好。好了廢話少說,開搞! 第一步搞個頁面: 第二部:添加相關js引用,參照api,初始化js和 ...
  • 在C#中進行RSA解密,需要用RSACryptoServiceProvider,但是不支持OpenSSL格式的公鑰或者私鑰。 X509 公鑰 非加密 PKCS#8 私鑰 PKCS#1 私鑰 引用以下 Nuget 包 https://www.nuget.org/packages/System.Exte ...
  • 題目: 統計一個字元串中數字和字元串的個數,並分別進行排列,要求 1.數字,字元串可以從鍵盤獲取。 2.儲存在list 3.統計數字個數,字元串個數 4.把數字和字元串按從小到大的順序輸出 5.不能使用數組. List的用法 List包括List介面以及List介面的所有實現類。因為List介面實現 ...
  • 一、Java介紹: Java技術主要分成三個部分:Java語言、Java運行環境和Java類庫。(一般情況下並不區分指哪個部分) 即Java並不只是一門編程語言,也是一個完整的平臺,有一套龐大的開發類庫(包含很多可以重覆利用的代碼)和提供跨平臺的可移植性、自動垃圾回收以及安全性等服務的執行環境。 1 ...
  • 在shell環境中,通配符能夠匹配文本範圍相當有限。正則表達式是一種用於文本匹配的形式小巧、且高度針對性的編程語言。例如: @[a-z0-9]+\.[a-z]+ 就是一個能匹配電子郵件的正則表達式。 正則表達式的基礎組成部分如下所示: 正則表達式 描述 示例 ^ 行起始標記 ^a匹配以a起始的行 $ ...
  • 偽靜態的實現 簡介 主要使用Apache提供的一個rewrite模塊來實現,可以實現URL地址的重寫 使用 開啟配置 更改虛擬主機裡面的配置 在網站根目錄建立一個.htaccess文件 案例1 實現將動態URL地址轉換成一個靜態的HTML地址 http://local.order.com/index... ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...