乾坤大挪移,如何將同步阻塞(sync)三方庫包轉換為非同步非阻塞(async)模式?Python3.10實現。

来源:https://www.cnblogs.com/v3ucn/archive/2022/12/30/17014299.html
-Advertisement-
Play Games

眾所周知,非同步併發編程可以幫助程式更好地處理阻塞操作,比如網路 IO 操作或文件 IO 操作,避免因等待這些操作完成而導致程式卡住的情況。雲存儲文件傳輸場景正好包含網路 IO 操作和文件 IO 操作,比如業內相對著名的七牛雲存儲,官方sdk的預設阻塞傳輸模式雖然差強人意,但未免有些循規蹈矩,不夠銳意 ...


眾所周知,非同步併發編程可以幫助程式更好地處理阻塞操作,比如網路 IO 操作或文件 IO 操作,避免因等待這些操作完成而導致程式卡住的情況。雲存儲文件傳輸場景正好包含網路 IO 操作和文件 IO 操作,比如業內相對著名的七牛雲存儲,官方sdk的預設阻塞傳輸模式雖然差強人意,但未免有些循規蹈矩,不夠銳意創新。在全球同性交友網站Github上找了一圈,也沒有找到非同步版本,那麼本次我們來自己動手將同步阻塞版本改造為非同步非阻塞版本,並上傳至Python官方庫。

非同步改造

首先參見七牛雲官方介面文檔:https://developer.qiniu.com/kodo,新建qiniu_async.py文件:

# @Author:Liu Yue (v3u.cn)  
# @Software:Vscode  
# @Time:2022/12/30  
  
import base64  
import hmac  
import time  
from hashlib import sha1  
import json  
import httpx  
import aiofiles  
  
  
  
class Qiniu:  
  
    def __init__(self, access_key, secret_key):  
        """初始化"""  
        self.__checkKey(access_key, secret_key)  
        self.__access_key = access_key  
        self.__secret_key = secret_key.encode('utf-8')  
  
    def get_access_key(self):  
        return self.__access_key  
  
    def get_secret_key(self):  
        return self.__secret_key  
  
    def __token(self, data):  
        hashed = hmac.new(self.__secret_key,data.encode('utf-8'), sha1)  
        return self.urlsafe_base64_encode(hashed.digest())  
  
    def token(self, data):  
        return '{0}:{1}'.format(self.__access_key, self.__token(data))  
  
    def token_with_data(self, data):  
        data = self.urlsafe_base64_encode(data)  
        return '{0}:{1}:{2}'.format(  
            self.__access_key, self.__token(data), data)  
  
    def urlsafe_base64_encode(self,data):  
  
        if isinstance(data, str):  
            data = data.encode('utf-8')  
  
        ret = base64.urlsafe_b64encode(data)  
  
        data = ret.decode('utf-8')  
  
        return data  
  
  
    @staticmethod  
    def __checkKey(access_key, secret_key):  
        if not (access_key and secret_key):  
            raise ValueError('invalid key')  
  
  
    def upload_token(  
            self,  
            bucket,  
            key=None,  
            expires=3600,  
            policy=None,  
            strict_policy=True):  
        """生成上傳憑證  
  
        Args:  
            bucket:  上傳的空間名  
            key:     上傳的文件名,預設為空  
            expires: 上傳憑證的過期時間,預設為3600s  
            policy:  上傳策略,預設為空  
  
        Returns:  
            上傳憑證  
        """  
        if bucket is None or bucket == '':  
            raise ValueError('invalid bucket name')  
  
        scope = bucket  
        if key is not None:  
            scope = '{0}:{1}'.format(bucket, key)  
  
        args = dict(  
            scope=scope,  
            deadline=int(time.time()) + expires,  
        )  
  
        return self.__upload_token(args)  
  
    @staticmethod  
    def up_token_decode(up_token):  
        up_token_list = up_token.split(':')  
        ak = up_token_list[0]  
        sign = base64.urlsafe_b64decode(up_token_list[1])  
        decode_policy = base64.urlsafe_b64decode(up_token_list[2])  
        decode_policy = decode_policy.decode('utf-8')  
        dict_policy = json.loads(decode_policy)  
        return ak, sign, dict_policy  
  
    def __upload_token(self, policy):  
        data = json.dumps(policy, separators=(',', ':'))  
        return self.token_with_data(data)  
  
  
    @staticmethod  
    def __copy_policy(policy, to, strict_policy):  
        for k, v in policy.items():  
            if (not strict_policy) or k in _policy_fields:  
                to[k] = v

這裡有兩個很關鍵的非同步非阻塞三方庫,分別是httpx和aiofiles,對應處理網路IO和文件IO阻塞問題:

pip3 install httpx  
pip3 install aiofiles

隨後按照文檔流程通過加密方法獲取文件上傳token,這裡無須進行非同步改造,因為並不涉及IO操作:

q = Qiniu(access_key,access_secret)  
  
token = q.upload_token("空間名稱")  
  
print(token)

程式返回:

➜  mydemo git:(master) ✗ /opt/homebrew/bin/python3.10 "/Users/liuyue/wodfan/work/mydemo/src/test.py"  
q06bq54Ps5JLfZyP8Ax-qvByMBdu8AoIVJpMco2m:8RjIo9a4CxHM3009DwjbMxDzlU8=:eyJzY29wZSI6ImFkLWgyMTEyIiwiZGVhZGxpbmUiOjE2NzIzNjg2NTd9

接著添加文件流推送方法,先看官方原版邏輯:

def put_data(  
        up_token, key, data, params=None, mime_type='application/octet-stream', check_crc=False, progress_handler=None,  
        fname=None, hostscache_dir=None, metadata=None):  
    """上傳二進位流到七牛  
    Args:  
        up_token:         上傳憑證  
        key:              上傳文件名  
        data:             上傳二進位流  
        params:           自定義變數,規格參考 https://developer.qiniu.com/kodo/manual/vars#xvar  
        mime_type:        上傳數據的mimeType  
        check_crc:        是否校驗crc32  
        progress_handler: 上傳進度  
        hostscache_dir:   host請求 緩存文件保存位置  
        metadata:         元數據  
    Returns:  
        一個dict變數,類似 {"hash": "<Hash string>", "key": "<Key string>"}  
        一個ResponseInfo對象  
    """  
    final_data = b''  
    if hasattr(data, 'read'):  
        while True:  
            tmp_data = data.read(config._BLOCK_SIZE)  
            if len(tmp_data) == 0:  
                break  
            else:  
                final_data += tmp_data  
    else:  
        final_data = data  
  
    crc = crc32(final_data)  
    return _form_put(up_token, key, final_data, params, mime_type,  
                     crc, hostscache_dir, progress_handler, fname, metadata=metadata)  
  
def _form_put(up_token, key, data, params, mime_type, crc, hostscache_dir=None, progress_handler=None, file_name=None,  
              modify_time=None, keep_last_modified=False, metadata=None):  
    fields = {}  
    if params:  
        for k, v in params.items():  
            fields[k] = str(v)  
    if crc:  
        fields['crc32'] = crc  
    if key is not None:  
        fields['key'] = key  
  
    fields['token'] = up_token  
    if config.get_default('default_zone').up_host:  
        url = config.get_default('default_zone').up_host  
    else:  
        url = config.get_default('default_zone').get_up_host_by_token(up_token, hostscache_dir)  
    # name = key if key else file_name  
  
    fname = file_name  
    if not fname or not fname.strip():  
        fname = 'file_name'  
  
    # last modify time  
    if modify_time and keep_last_modified:  
        fields['x-qn-meta-!Last-Modified'] = rfc_from_timestamp(modify_time)  
  
    if metadata:  
        for k, v in metadata.items():  
            if k.startswith('x-qn-meta-'):  
                fields[k] = str(v)  
  
    r, info = http._post_file(url, data=fields, files={'file': (fname, data, mime_type)})  
    if r is None and info.need_retry():  
        if info.connect_failed:  
            if config.get_default('default_zone').up_host_backup:  
                url = config.get_default('default_zone').up_host_backup  
            else:  
                url = config.get_default('default_zone').get_up_host_backup_by_token(up_token, hostscache_dir)  
        if hasattr(data, 'read') is False:  
            pass  
        elif hasattr(data, 'seek') and (not hasattr(data, 'seekable') or data.seekable()):  
            data.seek(0)  
        else:  
            return r, info  
        r, info = http._post_file(url, data=fields, files={'file': (fname, data, mime_type)})  
  
    return r, info

這裡官方使用兩個方法,先試用put_data方法將字元串轉換為二進位文件流,隨後調用_form_put進行同步上傳操作,這裡_form_put這個私有方法是可復用的,既相容文件流也相容文件實體,寫法上非常值得我們借鑒,弄明白了官方原版的流程後,讓我們撰寫文件流傳輸的非同步版本:

# 上傳文件流  
    async def upload_data(self,up_token, key,data,url="http://up-z1.qiniup.com",params=None,mime_type='application/octet-stream',file_name=None,metadata=None):  
  
        data.encode('utf-8')  
          
        fields = {}  
        if params:  
            for k, v in params.items():  
                fields[k] = str(v)  
  
        if key is not None:  
            fields['key'] = key  
        fields['token'] = up_token  
  
        fname = file_name  
        if not fname or not fname.strip():  
            fname = 'file_name'  
  
        async with httpx.AsyncClient() as client:  
  
            # 調用非同步使用await關鍵字  
            res = await client.post(url,data=fields,files={'file': (fname,data,mime_type)})  
  
            print(res.text)

這裡我們聲明非同步方法upload_data,通過encode直接轉換文件流,並使用非同步httpx.AsyncClient()對象將文件流推送到官網介面地址:up-z1.qiniup.com

隨後進行測試:

import asyncio
q = qiniu_async.Qiniu("accesskey","accesssecret")  
  
token = q.upload_token("空間名稱")  
  
#文件流上傳  
asyncio.run(q.upload_data(token,"3343.txt","123測試"))

程式返回:

➜  mydemo git:(master) ✗ /opt/homebrew/bin/python3.10 "/Users/liuyue/wodfan/work/mydemo/src/test.py"  
{"hash":"FtnQXAXft5AsOH1mrmXGaRzSt-95","key":"33434.txt"}

介面會返迴文件流的hash編碼,沒有問題。

接著查看文件上傳流程:

def put_file(up_token, key, file_path, params=None,  
             mime_type='application/octet-stream', check_crc=False,  
             progress_handler=None, upload_progress_recorder=None, keep_last_modified=False, hostscache_dir=None,  
             part_size=None, version=None, bucket_name=None, metadata=None):  
    """上傳文件到七牛  
    Args:  
        up_token:                 上傳憑證  
        key:                      上傳文件名  
        file_path:                上傳文件的路徑  
        params:                   自定義變數,規格參考 https://developer.qiniu.com/kodo/manual/vars#xvar  
        mime_type:                上傳數據的mimeType  
        check_crc:                是否校驗crc32  
        progress_handler:         上傳進度  
        upload_progress_recorder: 記錄上傳進度,用於斷點續傳  
        hostscache_dir:           host請求 緩存文件保存位置  
        version:                  分片上傳版本 目前支持v1/v2版本 預設v1  
        part_size:                分片上傳v2必傳欄位 預設大小為4MB 分片大小範圍為1 MB - 1 GB  
        bucket_name:              分片上傳v2欄位 空間名稱  
        metadata:                 元數據信息  
    Returns:  
        一個dict變數,類似 {"hash": "<Hash string>", "key": "<Key string>"}  
        一個ResponseInfo對象  
    """  
    ret = {}  
    size = os.stat(file_path).st_size  
    with open(file_path, 'rb') as input_stream:  
        file_name = os.path.basename(file_path)  
        modify_time = int(os.path.getmtime(file_path))  
        if size > config.get_default('default_upload_threshold'):  
            ret, info = put_stream(up_token, key, input_stream, file_name, size, hostscache_dir, params,  
                                   mime_type, progress_handler,  
                                   upload_progress_recorder=upload_progress_recorder,  
                                   modify_time=modify_time, keep_last_modified=keep_last_modified,  
                                   part_size=part_size, version=version, bucket_name=bucket_name, metadata=metadata)  
        else:  
            crc = file_crc32(file_path)  
            ret, info = _form_put(up_token, key, input_stream, params, mime_type,  
                                  crc, hostscache_dir, progress_handler, file_name,  
                                  modify_time=modify_time, keep_last_modified=keep_last_modified, metadata=metadata)  
    return ret, info

這裡官方使用的是標準庫上下文管理器同步讀取文件,改寫為非同步方法:

# 上傳文件實體  
    async def upload_file(self,up_token,key,path,url="http://up-z1.qiniup.com",params=None,mime_type='application/octet-stream',file_name=None,metadata=None):  
  
  
        async with aiofiles.open(path, mode='rb') as f:  
            contents = await f.read()  
          
        fields = {}  
        if params:  
            for k, v in params.items():  
                fields[k] = str(v)  
  
        if key is not None:  
            fields['key'] = key  
        fields['token'] = up_token  
  
        fname = file_name  
        if not fname or not fname.strip():  
            fname = 'file_name'  
  
        async with httpx.AsyncClient() as client:  
  
            # 調用非同步使用await關鍵字  
            res = await client.post(url,data=fields,files={'file': (fname,contents,mime_type)})  
  
            print(res.text)

通過aiofiles非同步讀取文件後,在通過httpx.AsyncClient()進行非同步傳輸。

需要註意的是,這裡預設傳輸到up-z1.qiniup.com介面,如果是不同區域的雲存儲伺服器,需要更改url參數的值,具體伺服器介面列表請參照官網文檔。

至此,文件流和文件非同步傳輸就改造好了。

上傳至Python官方庫

為了方便廣大七牛雲用戶使用非同步傳輸版本庫,可以將qiniu-async上傳到Python官方庫,首先註冊成為Python官方庫的開發者:pypi.org/

隨後在項目根目錄下新建setup.py文件:

import setuptools  
import pathlib  
  
here = pathlib.Path(__file__).parent.resolve()  
long_description = (here / "README.md").read_text(encoding="utf-8")  
  
setuptools.setup(  
    name="qiniu-async",  
    version="1.0.1",  
    author="LiuYue",  
    author_email="[email protected]",  
    description="qiniu_async python library",  
    long_description=long_description,  
    long_description_content_type="text/markdown",  
    url="https://github.com/qiniu-async",  
    packages=setuptools.find_packages(),  
    license="Apache 2.0",  
    classifiers=[  
        "Development Status :: 3 - Alpha",  
        "Intended Audience :: Developers",  
        "Programming Language :: Python :: 3",  
        "Programming Language :: Python :: 3.7",  
        "Programming Language :: Python :: 3.8",  
        "Programming Language :: Python :: 3.9",  
        "Programming Language :: Python :: 3.10",  
        "Programming Language :: Python :: 3 :: Only",  
        "License :: OSI Approved :: MIT License",  
        "Operating System :: OS Independent",  
  
    ],  
    keywords="qiniu, qiniu_async, async",  
    py_modules=[  
        'qiniu_async'  
    ],  
    install_requires=["aiofiles","httpx"],  
)

這是安裝文件,主要為了聲明該模塊的名稱、作者、版本以及依賴庫。

隨後本地打包文件:

python3 setup.py sdist

程式會根據setup.py文件生成壓縮包:

➜  qiniu_async tree  
.  
├── README.md  
├── dist  
│ └── qiniu-async-1.0.1.tar.gz  
├── https:  
│ └── github.com  
│     └── zcxey2911  
│         └── qiniu-async.git  
├── qiniu_async.egg-info  
│ ├── PKG-INFO  
│ ├── SOURCES.txt  
│ ├── dependency_links.txt  
│ ├── requires.txt  
│ └── top_level.txt  
├── qiniu_async.py  
└── setup.py

接著安裝twine庫, 準備提交Python官網:

pip3 install twine

隨後在根目錄運行命令提交:

twine upload dist/*

在官網進行查看:https://pypi.org/project/qiniu-async/

隨後本地就可以直接通過pip命令句進行安裝了:

pip install qiniu-async -i https://pypi.org/simple

非常方便。

結語

雲端存儲,非同步加持,猛虎添翼,未敢擁缽獨饗,除了通過pip安裝qiniu-async庫,也奉上Github項目地址:https://github.com/zcxey2911/qiniu-async ,與眾鄉親同饗。


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 談起消息隊列,內心還是會有些波瀾。 消息隊列、緩存、分庫分表是高併發解決方案三劍客,而消息隊列是我最喜歡,也是思考最多的技術。我想按照下麵的四個階段分享我與消息隊列的故事,同時也是對我技術成長經歷的回顧。 ...
  • C語言 我們在學習電腦學科時,往往最先接觸到的編程語言是C,它是所有語言中,最接近底層的高級語言之一,因而它具有執行速度快的優點。但它又具有開發周期長和對於經驗不足的開發者極容易犯錯的缺點。C語言應用範圍廣泛,你幾乎可以在任何場景中看到它的影子。 C語言編譯原理 一個編寫好的C代碼經過編譯成可執行 ...
  • jdk安裝 下載jdk 由於現在主流就是jdk1.8,所以這裡就下載jdk1.8進行演示。官方下載地址:https://www.oracle.com/java/technologies/downloads/#java8-windows。 官方下載需要註冊oracle賬號,國內下載有可能速度慢,若不想 ...
  • 題目來源 400. 第 N 位數字 題目詳情 給你一個整數 n ,請你在無限的整數序列 [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, ...] 中找出並返回第 n 位上的數字。 示例 1: 輸入: n = 3 輸出: 3 示例 2: 輸入: n = 11 輸出: 0 解釋: ...
  • #增強for迴圈 增強for迴圈 (也稱for each迴圈) 是迭代器遍歷方法的一個“簡化版”,是JDK1.5以後出來的一個高級for迴圈,專門用來遍曆數組和集合。 普通for迴圈 int[] num = {1,2,3,4,5,6}; for(int i = 0 ; i<num.length ; ...
  • RocketMQ 優異的性能表現,必然繞不開其優秀的存儲模型 。 這篇文章,筆者按照自己的理解 , 嘗試分析 RocketMQ 的存儲模型,希望對大家有所啟發。 1 整體概覽 首先溫習下 RocketMQ 架構。 整體架構中包含四種角色 : Producer :消息發佈的角色,Producer 通過 ...
  • JZ74 和為S的連續正數序列 題目 小明很喜歡數學,有一天他在做數學作業時,要求計算出9~16的和,他馬上就寫出了正確答案是100。 但是他並不滿足於此,他在想究竟有多少種連續的正數序列的和為100(至少包括兩個數)。 沒多久,他就得到另一組連續正數和為100的序列:18,19,20,21,22。 ...
  • 1. C++常量表達式 constexpr 是 C++ 11 標準新引入的關鍵字,在學習其具體用法和功能之前,我們需要先搞清楚 C++ 常量表達式的含義。 所謂常量表達式,指的就是由多個(≥1)常量組成的表達式。換句話說,如果表達式中的成員都是常量,那麼該表達式就是一個常量表達式。這也意味著,常量表 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...