乾坤大挪移,如何將同步阻塞(sync)三方庫包轉換為非同步非阻塞(async)模式?Python3.10實現。

来源:https://www.cnblogs.com/v3ucn/archive/2022/12/30/17014299.html
-Advertisement-
Play Games

眾所周知,非同步併發編程可以幫助程式更好地處理阻塞操作,比如網路 IO 操作或文件 IO 操作,避免因等待這些操作完成而導致程式卡住的情況。雲存儲文件傳輸場景正好包含網路 IO 操作和文件 IO 操作,比如業內相對著名的七牛雲存儲,官方sdk的預設阻塞傳輸模式雖然差強人意,但未免有些循規蹈矩,不夠銳意 ...


眾所周知,非同步併發編程可以幫助程式更好地處理阻塞操作,比如網路 IO 操作或文件 IO 操作,避免因等待這些操作完成而導致程式卡住的情況。雲存儲文件傳輸場景正好包含網路 IO 操作和文件 IO 操作,比如業內相對著名的七牛雲存儲,官方sdk的預設阻塞傳輸模式雖然差強人意,但未免有些循規蹈矩,不夠銳意創新。在全球同性交友網站Github上找了一圈,也沒有找到非同步版本,那麼本次我們來自己動手將同步阻塞版本改造為非同步非阻塞版本,並上傳至Python官方庫。

非同步改造

首先參見七牛雲官方介面文檔:https://developer.qiniu.com/kodo,新建qiniu_async.py文件:

# @Author:Liu Yue (v3u.cn)  
# @Software:Vscode  
# @Time:2022/12/30  
  
import base64  
import hmac  
import time  
from hashlib import sha1  
import json  
import httpx  
import aiofiles  
  
  
  
class Qiniu:  
  
    def __init__(self, access_key, secret_key):  
        """初始化"""  
        self.__checkKey(access_key, secret_key)  
        self.__access_key = access_key  
        self.__secret_key = secret_key.encode('utf-8')  
  
    def get_access_key(self):  
        return self.__access_key  
  
    def get_secret_key(self):  
        return self.__secret_key  
  
    def __token(self, data):  
        hashed = hmac.new(self.__secret_key,data.encode('utf-8'), sha1)  
        return self.urlsafe_base64_encode(hashed.digest())  
  
    def token(self, data):  
        return '{0}:{1}'.format(self.__access_key, self.__token(data))  
  
    def token_with_data(self, data):  
        data = self.urlsafe_base64_encode(data)  
        return '{0}:{1}:{2}'.format(  
            self.__access_key, self.__token(data), data)  
  
    def urlsafe_base64_encode(self,data):  
  
        if isinstance(data, str):  
            data = data.encode('utf-8')  
  
        ret = base64.urlsafe_b64encode(data)  
  
        data = ret.decode('utf-8')  
  
        return data  
  
  
    @staticmethod  
    def __checkKey(access_key, secret_key):  
        if not (access_key and secret_key):  
            raise ValueError('invalid key')  
  
  
    def upload_token(  
            self,  
            bucket,  
            key=None,  
            expires=3600,  
            policy=None,  
            strict_policy=True):  
        """生成上傳憑證  
  
        Args:  
            bucket:  上傳的空間名  
            key:     上傳的文件名,預設為空  
            expires: 上傳憑證的過期時間,預設為3600s  
            policy:  上傳策略,預設為空  
  
        Returns:  
            上傳憑證  
        """  
        if bucket is None or bucket == '':  
            raise ValueError('invalid bucket name')  
  
        scope = bucket  
        if key is not None:  
            scope = '{0}:{1}'.format(bucket, key)  
  
        args = dict(  
            scope=scope,  
            deadline=int(time.time()) + expires,  
        )  
  
        return self.__upload_token(args)  
  
    @staticmethod  
    def up_token_decode(up_token):  
        up_token_list = up_token.split(':')  
        ak = up_token_list[0]  
        sign = base64.urlsafe_b64decode(up_token_list[1])  
        decode_policy = base64.urlsafe_b64decode(up_token_list[2])  
        decode_policy = decode_policy.decode('utf-8')  
        dict_policy = json.loads(decode_policy)  
        return ak, sign, dict_policy  
  
    def __upload_token(self, policy):  
        data = json.dumps(policy, separators=(',', ':'))  
        return self.token_with_data(data)  
  
  
    @staticmethod  
    def __copy_policy(policy, to, strict_policy):  
        for k, v in policy.items():  
            if (not strict_policy) or k in _policy_fields:  
                to[k] = v

這裡有兩個很關鍵的非同步非阻塞三方庫,分別是httpx和aiofiles,對應處理網路IO和文件IO阻塞問題:

pip3 install httpx  
pip3 install aiofiles

隨後按照文檔流程通過加密方法獲取文件上傳token,這裡無須進行非同步改造,因為並不涉及IO操作:

q = Qiniu(access_key,access_secret)  
  
token = q.upload_token("空間名稱")  
  
print(token)

程式返回:

➜  mydemo git:(master) ✗ /opt/homebrew/bin/python3.10 "/Users/liuyue/wodfan/work/mydemo/src/test.py"  
q06bq54Ps5JLfZyP8Ax-qvByMBdu8AoIVJpMco2m:8RjIo9a4CxHM3009DwjbMxDzlU8=:eyJzY29wZSI6ImFkLWgyMTEyIiwiZGVhZGxpbmUiOjE2NzIzNjg2NTd9

接著添加文件流推送方法,先看官方原版邏輯:

def put_data(  
        up_token, key, data, params=None, mime_type='application/octet-stream', check_crc=False, progress_handler=None,  
        fname=None, hostscache_dir=None, metadata=None):  
    """上傳二進位流到七牛  
    Args:  
        up_token:         上傳憑證  
        key:              上傳文件名  
        data:             上傳二進位流  
        params:           自定義變數,規格參考 https://developer.qiniu.com/kodo/manual/vars#xvar  
        mime_type:        上傳數據的mimeType  
        check_crc:        是否校驗crc32  
        progress_handler: 上傳進度  
        hostscache_dir:   host請求 緩存文件保存位置  
        metadata:         元數據  
    Returns:  
        一個dict變數,類似 {"hash": "<Hash string>", "key": "<Key string>"}  
        一個ResponseInfo對象  
    """  
    final_data = b''  
    if hasattr(data, 'read'):  
        while True:  
            tmp_data = data.read(config._BLOCK_SIZE)  
            if len(tmp_data) == 0:  
                break  
            else:  
                final_data += tmp_data  
    else:  
        final_data = data  
  
    crc = crc32(final_data)  
    return _form_put(up_token, key, final_data, params, mime_type,  
                     crc, hostscache_dir, progress_handler, fname, metadata=metadata)  
  
def _form_put(up_token, key, data, params, mime_type, crc, hostscache_dir=None, progress_handler=None, file_name=None,  
              modify_time=None, keep_last_modified=False, metadata=None):  
    fields = {}  
    if params:  
        for k, v in params.items():  
            fields[k] = str(v)  
    if crc:  
        fields['crc32'] = crc  
    if key is not None:  
        fields['key'] = key  
  
    fields['token'] = up_token  
    if config.get_default('default_zone').up_host:  
        url = config.get_default('default_zone').up_host  
    else:  
        url = config.get_default('default_zone').get_up_host_by_token(up_token, hostscache_dir)  
    # name = key if key else file_name  
  
    fname = file_name  
    if not fname or not fname.strip():  
        fname = 'file_name'  
  
    # last modify time  
    if modify_time and keep_last_modified:  
        fields['x-qn-meta-!Last-Modified'] = rfc_from_timestamp(modify_time)  
  
    if metadata:  
        for k, v in metadata.items():  
            if k.startswith('x-qn-meta-'):  
                fields[k] = str(v)  
  
    r, info = http._post_file(url, data=fields, files={'file': (fname, data, mime_type)})  
    if r is None and info.need_retry():  
        if info.connect_failed:  
            if config.get_default('default_zone').up_host_backup:  
                url = config.get_default('default_zone').up_host_backup  
            else:  
                url = config.get_default('default_zone').get_up_host_backup_by_token(up_token, hostscache_dir)  
        if hasattr(data, 'read') is False:  
            pass  
        elif hasattr(data, 'seek') and (not hasattr(data, 'seekable') or data.seekable()):  
            data.seek(0)  
        else:  
            return r, info  
        r, info = http._post_file(url, data=fields, files={'file': (fname, data, mime_type)})  
  
    return r, info

這裡官方使用兩個方法,先試用put_data方法將字元串轉換為二進位文件流,隨後調用_form_put進行同步上傳操作,這裡_form_put這個私有方法是可復用的,既相容文件流也相容文件實體,寫法上非常值得我們借鑒,弄明白了官方原版的流程後,讓我們撰寫文件流傳輸的非同步版本:

# 上傳文件流  
    async def upload_data(self,up_token, key,data,url="http://up-z1.qiniup.com",params=None,mime_type='application/octet-stream',file_name=None,metadata=None):  
  
        data.encode('utf-8')  
          
        fields = {}  
        if params:  
            for k, v in params.items():  
                fields[k] = str(v)  
  
        if key is not None:  
            fields['key'] = key  
        fields['token'] = up_token  
  
        fname = file_name  
        if not fname or not fname.strip():  
            fname = 'file_name'  
  
        async with httpx.AsyncClient() as client:  
  
            # 調用非同步使用await關鍵字  
            res = await client.post(url,data=fields,files={'file': (fname,data,mime_type)})  
  
            print(res.text)

這裡我們聲明非同步方法upload_data,通過encode直接轉換文件流,並使用非同步httpx.AsyncClient()對象將文件流推送到官網介面地址:up-z1.qiniup.com

隨後進行測試:

import asyncio
q = qiniu_async.Qiniu("accesskey","accesssecret")  
  
token = q.upload_token("空間名稱")  
  
#文件流上傳  
asyncio.run(q.upload_data(token,"3343.txt","123測試"))

程式返回:

➜  mydemo git:(master) ✗ /opt/homebrew/bin/python3.10 "/Users/liuyue/wodfan/work/mydemo/src/test.py"  
{"hash":"FtnQXAXft5AsOH1mrmXGaRzSt-95","key":"33434.txt"}

介面會返迴文件流的hash編碼,沒有問題。

接著查看文件上傳流程:

def put_file(up_token, key, file_path, params=None,  
             mime_type='application/octet-stream', check_crc=False,  
             progress_handler=None, upload_progress_recorder=None, keep_last_modified=False, hostscache_dir=None,  
             part_size=None, version=None, bucket_name=None, metadata=None):  
    """上傳文件到七牛  
    Args:  
        up_token:                 上傳憑證  
        key:                      上傳文件名  
        file_path:                上傳文件的路徑  
        params:                   自定義變數,規格參考 https://developer.qiniu.com/kodo/manual/vars#xvar  
        mime_type:                上傳數據的mimeType  
        check_crc:                是否校驗crc32  
        progress_handler:         上傳進度  
        upload_progress_recorder: 記錄上傳進度,用於斷點續傳  
        hostscache_dir:           host請求 緩存文件保存位置  
        version:                  分片上傳版本 目前支持v1/v2版本 預設v1  
        part_size:                分片上傳v2必傳欄位 預設大小為4MB 分片大小範圍為1 MB - 1 GB  
        bucket_name:              分片上傳v2欄位 空間名稱  
        metadata:                 元數據信息  
    Returns:  
        一個dict變數,類似 {"hash": "<Hash string>", "key": "<Key string>"}  
        一個ResponseInfo對象  
    """  
    ret = {}  
    size = os.stat(file_path).st_size  
    with open(file_path, 'rb') as input_stream:  
        file_name = os.path.basename(file_path)  
        modify_time = int(os.path.getmtime(file_path))  
        if size > config.get_default('default_upload_threshold'):  
            ret, info = put_stream(up_token, key, input_stream, file_name, size, hostscache_dir, params,  
                                   mime_type, progress_handler,  
                                   upload_progress_recorder=upload_progress_recorder,  
                                   modify_time=modify_time, keep_last_modified=keep_last_modified,  
                                   part_size=part_size, version=version, bucket_name=bucket_name, metadata=metadata)  
        else:  
            crc = file_crc32(file_path)  
            ret, info = _form_put(up_token, key, input_stream, params, mime_type,  
                                  crc, hostscache_dir, progress_handler, file_name,  
                                  modify_time=modify_time, keep_last_modified=keep_last_modified, metadata=metadata)  
    return ret, info

這裡官方使用的是標準庫上下文管理器同步讀取文件,改寫為非同步方法:

# 上傳文件實體  
    async def upload_file(self,up_token,key,path,url="http://up-z1.qiniup.com",params=None,mime_type='application/octet-stream',file_name=None,metadata=None):  
  
  
        async with aiofiles.open(path, mode='rb') as f:  
            contents = await f.read()  
          
        fields = {}  
        if params:  
            for k, v in params.items():  
                fields[k] = str(v)  
  
        if key is not None:  
            fields['key'] = key  
        fields['token'] = up_token  
  
        fname = file_name  
        if not fname or not fname.strip():  
            fname = 'file_name'  
  
        async with httpx.AsyncClient() as client:  
  
            # 調用非同步使用await關鍵字  
            res = await client.post(url,data=fields,files={'file': (fname,contents,mime_type)})  
  
            print(res.text)

通過aiofiles非同步讀取文件後,在通過httpx.AsyncClient()進行非同步傳輸。

需要註意的是,這裡預設傳輸到up-z1.qiniup.com介面,如果是不同區域的雲存儲伺服器,需要更改url參數的值,具體伺服器介面列表請參照官網文檔。

至此,文件流和文件非同步傳輸就改造好了。

上傳至Python官方庫

為了方便廣大七牛雲用戶使用非同步傳輸版本庫,可以將qiniu-async上傳到Python官方庫,首先註冊成為Python官方庫的開發者:pypi.org/

隨後在項目根目錄下新建setup.py文件:

import setuptools  
import pathlib  
  
here = pathlib.Path(__file__).parent.resolve()  
long_description = (here / "README.md").read_text(encoding="utf-8")  
  
setuptools.setup(  
    name="qiniu-async",  
    version="1.0.1",  
    author="LiuYue",  
    author_email="[email protected]",  
    description="qiniu_async python library",  
    long_description=long_description,  
    long_description_content_type="text/markdown",  
    url="https://github.com/qiniu-async",  
    packages=setuptools.find_packages(),  
    license="Apache 2.0",  
    classifiers=[  
        "Development Status :: 3 - Alpha",  
        "Intended Audience :: Developers",  
        "Programming Language :: Python :: 3",  
        "Programming Language :: Python :: 3.7",  
        "Programming Language :: Python :: 3.8",  
        "Programming Language :: Python :: 3.9",  
        "Programming Language :: Python :: 3.10",  
        "Programming Language :: Python :: 3 :: Only",  
        "License :: OSI Approved :: MIT License",  
        "Operating System :: OS Independent",  
  
    ],  
    keywords="qiniu, qiniu_async, async",  
    py_modules=[  
        'qiniu_async'  
    ],  
    install_requires=["aiofiles","httpx"],  
)

這是安裝文件,主要為了聲明該模塊的名稱、作者、版本以及依賴庫。

隨後本地打包文件:

python3 setup.py sdist

程式會根據setup.py文件生成壓縮包:

➜  qiniu_async tree  
.  
├── README.md  
├── dist  
│ └── qiniu-async-1.0.1.tar.gz  
├── https:  
│ └── github.com  
│     └── zcxey2911  
│         └── qiniu-async.git  
├── qiniu_async.egg-info  
│ ├── PKG-INFO  
│ ├── SOURCES.txt  
│ ├── dependency_links.txt  
│ ├── requires.txt  
│ └── top_level.txt  
├── qiniu_async.py  
└── setup.py

接著安裝twine庫, 準備提交Python官網:

pip3 install twine

隨後在根目錄運行命令提交:

twine upload dist/*

在官網進行查看:https://pypi.org/project/qiniu-async/

隨後本地就可以直接通過pip命令句進行安裝了:

pip install qiniu-async -i https://pypi.org/simple

非常方便。

結語

雲端存儲,非同步加持,猛虎添翼,未敢擁缽獨饗,除了通過pip安裝qiniu-async庫,也奉上Github項目地址:https://github.com/zcxey2911/qiniu-async ,與眾鄉親同饗。


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 談起消息隊列,內心還是會有些波瀾。 消息隊列、緩存、分庫分表是高併發解決方案三劍客,而消息隊列是我最喜歡,也是思考最多的技術。我想按照下麵的四個階段分享我與消息隊列的故事,同時也是對我技術成長經歷的回顧。 ...
  • C語言 我們在學習電腦學科時,往往最先接觸到的編程語言是C,它是所有語言中,最接近底層的高級語言之一,因而它具有執行速度快的優點。但它又具有開發周期長和對於經驗不足的開發者極容易犯錯的缺點。C語言應用範圍廣泛,你幾乎可以在任何場景中看到它的影子。 C語言編譯原理 一個編寫好的C代碼經過編譯成可執行 ...
  • jdk安裝 下載jdk 由於現在主流就是jdk1.8,所以這裡就下載jdk1.8進行演示。官方下載地址:https://www.oracle.com/java/technologies/downloads/#java8-windows。 官方下載需要註冊oracle賬號,國內下載有可能速度慢,若不想 ...
  • 題目來源 400. 第 N 位數字 題目詳情 給你一個整數 n ,請你在無限的整數序列 [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, ...] 中找出並返回第 n 位上的數字。 示例 1: 輸入: n = 3 輸出: 3 示例 2: 輸入: n = 11 輸出: 0 解釋: ...
  • #增強for迴圈 增強for迴圈 (也稱for each迴圈) 是迭代器遍歷方法的一個“簡化版”,是JDK1.5以後出來的一個高級for迴圈,專門用來遍曆數組和集合。 普通for迴圈 int[] num = {1,2,3,4,5,6}; for(int i = 0 ; i<num.length ; ...
  • RocketMQ 優異的性能表現,必然繞不開其優秀的存儲模型 。 這篇文章,筆者按照自己的理解 , 嘗試分析 RocketMQ 的存儲模型,希望對大家有所啟發。 1 整體概覽 首先溫習下 RocketMQ 架構。 整體架構中包含四種角色 : Producer :消息發佈的角色,Producer 通過 ...
  • JZ74 和為S的連續正數序列 題目 小明很喜歡數學,有一天他在做數學作業時,要求計算出9~16的和,他馬上就寫出了正確答案是100。 但是他並不滿足於此,他在想究竟有多少種連續的正數序列的和為100(至少包括兩個數)。 沒多久,他就得到另一組連續正數和為100的序列:18,19,20,21,22。 ...
  • 1. C++常量表達式 constexpr 是 C++ 11 標準新引入的關鍵字,在學習其具體用法和功能之前,我們需要先搞清楚 C++ 常量表達式的含義。 所謂常量表達式,指的就是由多個(≥1)常量組成的表達式。換句話說,如果表達式中的成員都是常量,那麼該表達式就是一個常量表達式。這也意味著,常量表 ...
一周排行
    -Advertisement-
    Play Games
  • 前言 在我們開發過程中基本上不可或缺的用到一些敏感機密數據,比如SQL伺服器的連接串或者是OAuth2的Secret等,這些敏感數據在代碼中是不太安全的,我們不應該在源代碼中存儲密碼和其他的敏感數據,一種推薦的方式是通過Asp.Net Core的機密管理器。 機密管理器 在 ASP.NET Core ...
  • 新改進提供的Taurus Rpc 功能,可以簡化微服務間的調用,同時可以不用再手動輸出模塊名稱,或調用路徑,包括負載均衡,這一切,由框架實現並提供了。新的Taurus Rpc 功能,將使得服務間的調用,更加輕鬆、簡約、高效。 ...
  • 順序棧的介面程式 目錄順序棧的介面程式頭文件創建順序棧入棧出棧利用棧將10進位轉16進位數驗證 頭文件 #include <stdio.h> #include <stdbool.h> #include <stdlib.h> 創建順序棧 // 指的是順序棧中的元素的數據類型,用戶可以根據需要進行修改 ...
  • 前言 整理這個官方翻譯的系列,原因是網上大部分的 tomcat 版本比較舊,此版本為 v11 最新的版本。 開源項目 從零手寫實現 tomcat minicat 別稱【嗅虎】心有猛虎,輕嗅薔薇。 系列文章 web server apache tomcat11-01-官方文檔入門介紹 web serv ...
  • C總結與剖析:關鍵字篇 -- <<C語言深度解剖>> 目錄C總結與剖析:關鍵字篇 -- <<C語言深度解剖>>程式的本質:二進位文件變數1.變數:記憶體上的某個位置開闢的空間2.變數的初始化3.為什麼要有變數4.局部變數與全局變數5.變數的大小由類型決定6.任何一個變數,記憶體賦值都是從低地址開始往高地 ...
  • 如果讓你來做一個有狀態流式應用的故障恢復,你會如何來做呢? 單機和多機會遇到什麼不同的問題? Flink Checkpoint 是做什麼用的?原理是什麼? ...
  • C++ 多級繼承 多級繼承是一種面向對象編程(OOP)特性,允許一個類從多個基類繼承屬性和方法。它使代碼更易於組織和維護,並促進代碼重用。 多級繼承的語法 在 C++ 中,使用 : 符號來指定繼承關係。多級繼承的語法如下: class DerivedClass : public BaseClass1 ...
  • 前言 什麼是SpringCloud? Spring Cloud 是一系列框架的有序集合,它利用 Spring Boot 的開發便利性簡化了分散式系統的開發,比如服務註冊、服務發現、網關、路由、鏈路追蹤等。Spring Cloud 並不是重覆造輪子,而是將市面上開發得比較好的模塊集成進去,進行封裝,從 ...
  • class_template 類模板和函數模板的定義和使用類似,我們已經進行了介紹。有時,有兩個或多個類,其功能是相同的,僅僅是數據類型不同。類模板用於實現類所需數據的類型參數化 template<class NameType, class AgeType> class Person { publi ...
  • 目錄system v IPC簡介共用記憶體需要用到的函數介面shmget函數--獲取對象IDshmat函數--獲得映射空間shmctl函數--釋放資源共用記憶體實現思路註意 system v IPC簡介 消息隊列、共用記憶體和信號量統稱為system v IPC(進程間通信機制),V是羅馬數字5,是UNI ...