Django文件存儲(二)定製存儲系統

来源:http://www.cnblogs.com/linxiyue/archive/2017/09/03/7456004.html
-Advertisement-
Play Games

要自己寫一個存儲系統,可以依照以下步驟: 1.寫一個繼承自django.core.files.storage.Storage的子類。 2.Django必須可以在無任何參數的情況下實例化MyStorage,所以任何環境設置必須來自django.conf.settings。 3.根據Storage的op ...


要自己寫一個存儲系統,可以依照以下步驟:

1.寫一個繼承自django.core.files.storage.Storage的子類。

from django.core.files.storage import Storage

class MyStorage(Storage):
    ...

2.Django必須可以在無任何參數的情況下實例化MyStorage,所以任何環境設置必須來自django.conf.settings。

from django.conf import settings
from django.core.files.storage import Storage

class MyStorage(Storage):
    def __init__(self, option=None):
        if not option:
            option = settings.CUSTOM_STORAGE_OPTIONS
        ...

3.根據Storage的open和save方法源碼:

    def open(self, name, mode='rb'):
        """
        Retrieves the specified file from storage.
        """
        return self._open(name, mode)


    def save(self, name, content, max_length=None):
        """
        Saves new content to the file specified by name. The content should be
        a proper File object or any python file-like object, ready to be read
        from the beginning.
        """
        # Get the proper name for the file, as it will actually be saved.
        if name is None:
            name = content.name

        if not hasattr(content, 'chunks'):
            content = File(content, name)

        name = self.get_available_name(name, max_length=max_length)
        return self._save(name, content)

MyStorage需要實現_open和_save方法。

如果寫的是個本地存儲系統,還要重寫path方法。

4.使用django.utils.deconstruct.deconstructible裝飾器,以便在migration可以序列化。

還有,Storage.delete()、Storage.exists()、Storage.listdir()、Storage.size()、Storage.url()方法都會報NotImplementedError,也需要重寫。

Django Qiniu Storage

七牛雲有自己的django storage系統,可以看下是怎麼運作的,地址https://github.com/glasslion/django-qiniu-storage

先在環境變數或者settings中配置QINIU_ACCESS_KEY、QINIU_SECRET_KEY、QINIU_BUCKET_NAME、QINIU_BUCKET_DOMAIN、QINIU_SECURE_URL。

使用七牛雲托管用戶上傳的文件,在 settings.py 里設置DEFAULT_FILE_STORAGE:

DEFAULT_FILE_STORAGE = 'qiniustorage.backends.QiniuStorage'

使用七牛托管動態生成的文件以及站點自身的靜態文件,設置:

STATICFILES_STORAGE  = 'qiniustorage.backends.QiniuStaticStorage'

運行python manage.py collectstatic,靜態文件就會被統一上傳到七牛。

QiniuStorage代碼如下:

@deconstructible
class QiniuStorage(Storage):
    """
    Qiniu Storage Service
    """
    location = ""

    def __init__(
            self,
            access_key=QINIU_ACCESS_KEY,
            secret_key=QINIU_SECRET_KEY,
            bucket_name=QINIU_BUCKET_NAME,
            bucket_domain=QINIU_BUCKET_DOMAIN,
            secure_url=QINIU_SECURE_URL):

        self.auth = Auth(access_key, secret_key)
        self.bucket_name = bucket_name
        self.bucket_domain = bucket_domain
        self.bucket_manager = BucketManager(self.auth)
        self.secure_url = secure_url

    def _clean_name(self, name):
        """
        Cleans the name so that Windows style paths work
        """
        # Normalize Windows style paths
        clean_name = posixpath.normpath(name).replace('\\', '/')

        # os.path.normpath() can strip trailing slashes so we implement
        # a workaround here.
        if name.endswith('/') and not clean_name.endswith('/'):
            # Add a trailing slash as it was stripped.
            return clean_name + '/'
        else:
            return clean_name

    def _normalize_name(self, name):
        """
        Normalizes the name so that paths like /path/to/ignored/../foo.txt
        work. We check to make sure that the path pointed to is not outside
        the directory specified by the LOCATION setting.
        """

        base_path = force_text(self.location)
        base_path = base_path.rstrip('/')

        final_path = urljoin(base_path.rstrip('/') + "/", name)

        base_path_len = len(base_path)
        if (not final_path.startswith(base_path) or
                final_path[base_path_len:base_path_len + 1] not in ('', '/')):
            raise SuspiciousOperation("Attempted access to '%s' denied." %
                                      name)
        return final_path.lstrip('/')

    def _open(self, name, mode='rb'):
        return QiniuFile(name, self, mode)

    def _save(self, name, content):
        cleaned_name = self._clean_name(name)
        name = self._normalize_name(cleaned_name)

        if hasattr(content, 'chunks'):
            content_str = b''.join(chunk for chunk in content.chunks())
        else:
            content_str = content.read()

        self._put_file(name, content_str)
        return cleaned_name

    def _put_file(self, name, content):
        token = self.auth.upload_token(self.bucket_name)
        ret, info = put_data(token, name, content)
        if ret is None or ret['key'] != name:
            raise QiniuError(info)

    def _read(self, name):
        return requests.get(self.url(name)).content

    def delete(self, name):
        name = self._normalize_name(self._clean_name(name))
        if six.PY2:
            name = name.encode('utf-8')
        ret, info = self.bucket_manager.delete(self.bucket_name, name)

        if ret is None or info.status_code == 612:
            raise QiniuError(info)

    def _file_stat(self, name, silent=False):
        name = self._normalize_name(self._clean_name(name))
        if six.PY2:
            name = name.encode('utf-8')
        ret, info = self.bucket_manager.stat(self.bucket_name, name)
        if ret is None and not silent:
            raise QiniuError(info)
        return ret

    def exists(self, name):
        stats = self._file_stat(name, silent=True)
        return True if stats else False

    def size(self, name):
        stats = self._file_stat(name)
        return stats['fsize']

    def modified_time(self, name):
        stats = self._file_stat(name)
        time_stamp = float(stats['putTime']) / 10000000
        return datetime.datetime.fromtimestamp(time_stamp)

    def listdir(self, name):
        name = self._normalize_name(self._clean_name(name))
        if name and not name.endswith('/'):
            name += '/'

        dirlist = bucket_lister(self.bucket_manager, self.bucket_name,
                                prefix=name)
        files = []
        dirs = set()
        base_parts = name.split("/")[:-1]
        for item in dirlist:
            parts = item['key'].split("/")
            parts = parts[len(base_parts):]
            if len(parts) == 1:
                # File
                files.append(parts[0])
            elif len(parts) > 1:
                # Directory
                dirs.add(parts[0])
        return list(dirs), files

    def url(self, name):
        name = self._normalize_name(self._clean_name(name))
        name = filepath_to_uri(name)
        protocol = u'https://' if self.secure_url else u'http://'
        return urljoin(protocol + self.bucket_domain, name)

配置是從環境變數或者settings.py中獲得的:

def get_qiniu_config(name, default=None):
    """
    Get configuration variable from environment variable
    or django setting.py
    """
    config = os.environ.get(name, getattr(settings, name, default))
    if config is not None:
        if isinstance(config, six.string_types):
            return config.strip()
        else:
            return config
    else:
        raise ImproperlyConfigured(
            "Can't find config for '%s' either in environment"
            "variable or in setting.py" % name)


QINIU_ACCESS_KEY = get_qiniu_config('QINIU_ACCESS_KEY')
QINIU_SECRET_KEY = get_qiniu_config('QINIU_SECRET_KEY')
QINIU_BUCKET_NAME = get_qiniu_config('QINIU_BUCKET_NAME')
QINIU_BUCKET_DOMAIN = get_qiniu_config('QINIU_BUCKET_DOMAIN', '').rstrip('/')
QINIU_SECURE_URL = get_qiniu_config('QINIU_SECURE_URL', 'False')

 重寫了_open和_save方法:

    def _open(self, name, mode='rb'):
        return QiniuFile(name, self, mode)

    def _save(self, name, content):
        cleaned_name = self._clean_name(name)
        name = self._normalize_name(cleaned_name)

        if hasattr(content, 'chunks'):
            content_str = b''.join(chunk for chunk in content.chunks())
        else:
            content_str = content.read()

        self._put_file(name, content_str)
        return cleaned_name

使用的put_data方法上傳文件,相關代碼如下:

def put_data(
        up_token, key, data, params=None, mime_type='application/octet-stream', check_crc=False, progress_handler=None,
        fname=None):
    """上傳二進位流到七牛

    Args:
        up_token:         上傳憑證
        key:              上傳文件名
        data:             上傳二進位流
        params:           自定義變數,規格參考 http://developer.qiniu.com/docs/v6/api/overview/up/response/vars.html#xvar
        mime_type:        上傳數據的mimeType
        check_crc:        是否校驗crc32
        progress_handler: 上傳進度

    Returns:
        一個dict變數,類似 {"hash": "<Hash string>", "key": "<Key string>"}
        一個ResponseInfo對象
    """
    crc = crc32(data) if check_crc else None
    return _form_put(up_token, key, data, params, mime_type, crc, progress_handler, fname)

def _form_put(up_token, key, data, params, mime_type, crc, progress_handler=None, file_name=None):
    fields = {}
    if params:
        for k, v in params.items():
            fields[k] = str(v)
    if crc:
        fields['crc32'] = crc
    if key is not None:
        fields['key'] = key

    fields['token'] = up_token
    url = config.get_default('default_zone').get_up_host_by_token(up_token) + '/'
    # name = key if key else file_name

    fname = file_name
    if not fname or not fname.strip():
        fname = 'file_name'

    r, info = http._post_file(url, data=fields, files={'file': (fname, data, mime_type)})
    if r is None and info.need_retry():
        if info.connect_failed:
            url = config.get_default('default_zone').get_up_host_backup_by_token(up_token) + '/'
        if hasattr(data, 'read') is False:
            pass
        elif hasattr(data, 'seek') and (not hasattr(data, 'seekable') or data.seekable()):
            data.seek(0)
        else:
            return r, info
        r, info = http._post_file(url, data=fields, files={'file': (fname, data, mime_type)})

    return r, info


def _post_file(url, data, files):
    return _post(url, data, files, None)

def _post(url, data, files, auth, headers=None):
    if _session is None:
        _init()
    try:
        post_headers = _headers.copy()
        if headers is not None:
            for k, v in headers.items():
                post_headers.update({k: v})
        r = _session.post(
            url, data=data, files=files, auth=auth, headers=post_headers,
            timeout=config.get_default('connection_timeout'))
    except Exception as e:
        return None, ResponseInfo(None, e)
    return __return_wrapper(r)


def _init():
    session = requests.Session()
    adapter = requests.adapters.HTTPAdapter(
        pool_connections=config.get_default('connection_pool'), pool_maxsize=config.get_default('connection_pool'),
        max_retries=config.get_default('connection_retries'))
    session.mount('http://', adapter)
    global _session
    _session = session

最終使用的是requests庫上傳文件的,統一適配了鏈接池個數、鏈接重試次數。


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 寫在前面整個項目都托管在了 Github 上:https://github.com/ikesnowy/Algorithms-4th-Edition-in-Csharp這一節內容可能會用到的庫文件有 Measurement 和 TestCase,同樣在 Github 上可以找到。善用 Ctrl + F... ...
  • 平常經常用到虛擬機,每次從gho轉換為vmdk時都要輸入cmd代碼,覺得麻煩,自己動手做了個gho2vmdk轉換工具,集成ghost32.exe文件,可以一鍵轉換,省時省事。運行時會將ghost32.exe保存到Program FIles文件夾里,運行完自動刪除ghost32.exe。覺得還不錯,在 ...
  • 今天又到了搶火車票的時候,反正是每次搶票都是傻眼。於是寫個小工具幫助自己查詢火車票,如果有票的話給自己發個郵件提示購買。 一、準備工作 利用firebug等工具,我們可以獲取到當我們單擊查詢時調用的Get請求。 請求地址: https://kyfw.12306.cn/otn/leftTicket/q ...
  • 在很多場合,我們需要線上編輯HTML內容,然後在頁面上或者其他終端上(如小程式、APP應用等)顯示,編輯HTML內容的插件有很多,本篇介紹基於Bootstrap的 summernote插件實現HTML文檔的編輯和圖片插入操作,這個控制項的使用非常方便,並且用戶群也很大。 ...
  • 郵件的內容其實是就HTML,傳統的做法都是通過在程式中拼接字元串來生成郵件的內容,生成困難,維護也困難。Razor是MVC裡面使用的視圖引擎,用來生成HTML非常方便,ZKEACMS中就是使用了Razor視圖引擎,用cshtml作為郵件模板來生成郵件內容。這樣很方便維護和修改。 ...
  • using System;using System.Drawing;using System.Windows.Forms; namespace 案例演示{ public partial class frmlogo : Form { public frmlogo() { InitializeCompo ...
  • 總結和開始記錄實驗的現象,調試代碼的經驗,同時開始用博客記錄一些技術上的進步 2017-09-01 09:01:02 1、 picturebox 上 進行覆蓋,移動,大小改變,都會觸發控制項重繪事件,重繪其本身和其所有的子控制項, 所以以繪圖區域為父容器的pictureBox在程式運行時進行的任意操作只 ...
  • python的文件操縱方法: file.readline() 讀取下一行文件,返回含有內容的字元串 file.readlines() 讀取整個文件,返回一個字元串列表 file.read() 讀取整個文件,返回一個字元串 f = open("filename","mode") 打開一個文件,mode ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...