MongoDB Python官方驅動 PyMongo 的簡單封裝類 DBManager。主要特性:對資料庫和集合的操作確保其存在性;支持PyMongo的原生操作,包括基本的CRUD操作、批量操作、MapReduce、多線程和多進程等;支持因果一致性會話和事務的流水線操作,並給出簡單示例。 ...
最近,需要使用 Python 對 MongodB 做一些簡單的操作,不想使用各種繁重的框架。出於可重用性的考慮,想對 MongoDB Python 官方驅動 PyMongo 做下簡單封裝,百度一如既往的未能給我一個滿意的結果,於是有了下文。
【正文】
PyMongo,MongoDB Python官方驅動
- docs: https://api.mongodb.com/python/current/index.html
- github: https://github.com/mongodb/mongo-python-driver
PyMongo 驅動幾乎支持 MongoDB 的全部特性,可以連接單個的 MongoDB 資料庫、副本集和分片集群。從提供的API角度來看,pymongo package是其核心,包含對資料庫的各種操作。本文將介紹一個簡單封裝類 DBManager。主要特性:對資料庫和集合的操作確保其存在性;支持PyMongo的原生操作,包括基本的CRUD操作、批量操作、MapReduce、多線程和多進程等;支持因果一致性會話和事務的流水線操作,並給出簡單示例。
MongoClient
mongo_client 提供了連接 MongoDB 的MongoClient類:
class pymongo.mongo_client.MongoClient(host='localhost', port=27017, document_class=dict, tz_aware=False, connect=True, **kwargs)
每個 MongoClient 實例 client (下文簡稱 client)都維護一個內建的連接池,預設 maxPoolsize 大小100。對於多線程的操作,連接池會給每一個線程一個 socket 連接,直到達到最大的連接數,後續的線程會阻塞以等待有可用的連接被釋放。client 對 MongoDB 拓撲結構中的每個server 還維護一個額外的連接來監聽 server 的狀態。
下麵的 new_mongo_client
函數用於獲取一個資料庫連接的 client。其中,client.admin.command('ismaster')
用來檢查 server 的可用狀態,簡單省事不需要認證。
def new_mongo_client(uri, **kwargs):
"""Create new pymongo.mongo_client.MongoClient instance. DO NOT USE IT DIRECTLY."""
try:
client = MongoClient(uri, maxPoolSize=1024, **kwargs)
client.admin.command('ismaster') # The ismaster command is cheap and does not require auth.
except ConnectionFailure:
logging.error("new_mongo_client(): Server not available, Please check you uri: {}".format(uri))
return None
else:
return client
PyMongo 不是進程(fork-safe)安全的,但在一個進程中是線程安全(thread-safe)的。因此常見的場景是,對於一個MongoDB 環境,為每一個進程中創建一個 client ,後面所有的資料庫操作都使用這一個實例,包括多線程操作。永遠不要為每一次操作都創建一個 MongoClient 實例,使用完後調用 MongoClient.close() 方法,這樣沒有必要而且會非常浪費性能。
鑒於以上原因,一般不宜直接使用new_mongo_client
函數獲取 client,而是進一步封裝為get_mongo_client
方法。 其中全局常量 URI_CLIENT_DICT
保持著資料庫 URI 字元串與對應 clinet 的字典,一個 URI 對應一個 client 。代碼如下:
MONGO_URI_DEFAULT = 'mongodb://localhost:27017/admin'
URI_CLIENT_DICT = {} # a dictionary hold all client with uri as key
def get_mongo_client(uri=MONGO_URI_DEFAULT, fork=False, **kwargs):
"""Get pymongo.mongo_client.MongoClient instance. One mongodb uri, one client.
@:param uri: mongodb uri
@:param fork: for fork-safe in multiprocess case, if fork=True, return a new MongoClient instance, default False.
@:param kwargs: refer to pymongo.mongo_client.MongoClient kwargs
"""
if fork:
return new_mongo_client(uri, **kwargs)
global URI_CLIENT_DICT
matched_client = URI_CLIENT_DICT.get(uri)
if matched_client is None: # no matched client
new_client = new_mongo_client(uri, **kwargs)
if new_client is not None:
URI_CLIENT_DICT[uri] = new_client
return new_client
return matched_client
確保 Database 和 Collection 的存在
PyMongo 有個特性:對於不存在的資料庫、集合上的查詢不會報錯。如下,Ipython中演示在不存在xxDB 資料庫和 xxCollection 集合上的操作:
In [1]: from pymongo import MongoClient
In [2]: client = MongoClient() # default uri is 'mongodb://localhost:27017/admin'
In [3]: db = client.get_database('xxDB') # Database(MongoClient(host=['localhost:27017'], document_class=dict, tz_aware=False, connect=True), u'xxDB')
In [4]: coll = db.get_collection('XXCollection') # Collection(Database(MongoClient(host=['localhost:27017'], document_class=dict, tz_aware=False, connect=True), u'xxDB'), u'XXCollection')
In [5]: coll.find_one() # note: no tip, no error, no exception, return None
In [6]: coll.insert_one({'hello' : 'what a fucking feature'})
Out[6]: <pymongo.results.InsertOneResult at 0x524ccc8>
In [7]: coll.find_one()
Out[7]: {u'_id': ObjectId('5c31c807bb048515b814d719'), u'hello': u'what a fucking feature'}
這對於手誤寫錯資料庫或集合名字後進行的後續操作,簡直就是災難。鑒於此因,有必要對獲取資料庫或集合時加上確認保護。
下麵對於獲取資料庫,使用 MongoClient.list_database_names() 獲取所有的資料庫名字,如果資料庫名稱不在其中,則返回None。同樣的道理,對於集合使用 Database.list_collection_names()。註:由於用戶許可權問題造成的獲取資料庫或集合列表的操作報錯的情況,預設不加確認保護。
def get_existing_db(client, db_name):
"""Get existing pymongo.database.Database instance.
@:param client: pymongo.mongo_client.MongoClient instance
@:param db_name: database name wanted
"""
if client is None:
logging.error('client {} is None'.format(client))
return None
try:
db_available_list = client.list_database_names()
except PyMongoError as e:
logging.error('client: {}, db_name: {}, client.list_database_names() error: {}'.
format(client, db_name, repr(e)))
else:
if db_name not in db_available_list:
logging.error('client {} has no db named {}'.format(client, db_name))
return None
db = client.get_database(db_name)
return db
def get_existing_coll(db, coll_name):
"""Get existing pymongo.collection.Collection instance.
@:param client: pymongo.mongo_client.MongoClient instance
@:param coll_name: collection name wanted
"""
if db is None:
logging.error('db {} is None'.format(db))
return None
try:
coll_available_list = db.list_collection_names()
except PyMongoError as e:
logging.error('db: {}, coll_name: {}, db.list_collection_names() error: {}'.
format(db, coll_name, repr(e)))
else:
if coll_name not in coll_available_list:
logging.error('db {} has no collection named {}'.format(db, coll_name))
return None
coll = db.get_collection(coll_name)
return coll
PyMongo 封裝類 DBManger
前文的冗長鋪墊主要是為了引入這個 PyMongo 驅動封裝類 DBManger。
DBManger 類的實例保持的狀態有MongoClient實例 self.client
, 資料庫self.db
和 集合self.coll
,並通過屬性(property)對外開放。PyMongo 原生的方法對這裡的 client, db 和 coll 同樣適用。client 由類的構造器調用上文的get_mongo_client
方法獲取,db 和 coll 即可通過類的構造器獲取也可通過 self.db_name
和 self.coll_name
這些 setter 來切換。
DBManger 類的實例持有的方法 self.create_coll(self, db_name, coll_name)
, session_pipeline(self, pipeline)
和 transaction_pipeline(self, pipeline)
。後兩種方法在下一節再具體解釋。
class DBManager:
"""A safe and simple pymongo packaging class ensuring existing database and collection.
Operations:
MongoClient level operations: https://api.mongodb.com/python/current/api/pymongo/mongo_client.html
Database level operations: https://api.mongodb.com/python/current/api/pymongo/database.html
Collection level operations: https://api.mongodb.com/python/current/api/pymongo/collection.html
"""
__default_uri = 'mongodb://localhost:27017/admin'
__default_db_name = 'test'
__default_coll_name = 'test'
def __init__(self, uri=__default_uri, db_name=__default_db_name, coll_name=__default_coll_name, **kwargs):
self.__uri = uri
self.__db_name = db_name
self.__coll_name = coll_name
self.__client = get_mongo_client(uri, **kwargs)
self.__db = get_existing_db(self.__client, db_name)
self.__coll = get_existing_coll(self.__db, coll_name)
def __str__(self):
return u'uri: {}, db_name: {}, coll_name: {}, id_client: {}, client: {}, db: {}, coll: {}'.format(
self.uri, self.db_name, self.coll_name, id(self.client), self.client, self.db, self.coll)
@property
def uri(self):
return self.__uri
@property
def db_name(self):
return self.__db_name
@property
def coll_name(self):
return self.__coll_name
@db_name.setter
def db_name(self, db_name):
self.__db_name = db_name
self.__db = get_existing_db(self.__client, db_name)
@coll_name.setter
def coll_name(self, coll_name):
self.__coll_name = coll_name
self.__coll = get_existing_coll(self.__db, coll_name)
@property
def client(self):
return self.__client
@property
def db(self):
return self.__db
@property
def coll(self):
# always use the current instance self.__db
self.__coll = get_existing_coll(self.__db, self.__coll_name)
return self.__coll
def create_coll(self, db_name, coll_name):
"""Create new collection with new or existing database"""
if self.__client is None:
return None
try:
return self.__client.get_database(db_name).create_collection(coll_name)
except CollectionInvalid:
logging.error('collection {} already exists in database {}'.format(coll_name, db_name))
return None
def session_pipeline(self, pipeline):
if self.__client is None:
logging.error('client is None in session_pipeline: {}'.format(self.__client))
return None
with self.__client.start_session(causal_consistency=True) as session:
result = []
for operation in pipeline:
try:
if operation.level == 'client':
target = self.__client
elif operation.level == 'db':
target = self.__db
elif operation.level == 'coll':
target = self.__coll
operation_name = operation.operation_name
args = operation.args
kwargs = operation.kwargs
operator = getattr(target, operation_name)
if type(args) == tuple:
ops_rst = operator(*args, session=session, **kwargs)
else:
ops_rst = operator(args, session=session, **kwargs)
if operation.callback is not None:
operation.out = operation.callback(ops_rst)
else:
operation.out = ops_rst
except Exception as e:
logging.error('{} {} Exception, session_pipeline args: {}, kwargs: {}'.format(
target, operation, args, kwargs))
logging.error('session_pipeline Exception: {}'.format(repr(e)))
result.append(operation)
return result
# https://api.mongodb.com/python/current/api/pymongo/client_session.html#transactions
def transaction_pipeline(self, pipeline):
if self.__client is None:
logging.error('client is None in transaction_pipeline: {}'.format(self.__client))
return None
with self.__client.start_session(causal_consistency=True) as session:
with session.start_transaction():
result = []
for operation in pipeline:
try:
if operation.level == 'client':
target = self.__client
elif operation.level == 'db':
target = self.__db
elif operation.level == 'coll':
target = self.__coll
operation_name = operation.operation_name
args = operation.args
kwargs = operation.kwargs
operator = getattr(target, operation_name)
if type(args) == tuple:
ops_rst = operator(*args, session=session, **kwargs)
else:
ops_rst = operator(args, session=session, **kwargs)
if operation.callback is not None:
operation.out = operation.callback(ops_rst)
else:
operation.out = ops_rst
except Exception as e:
logging.error('{} {} Exception, transaction_pipeline args: {}, kwargs: {}'.format(
target, operation, args, kwargs))
logging.error('transaction_pipeline Exception: {}'.format(repr(e)))
raise Exception(repr(e))
result.append(operation)
return result
這裡給出一些例子來說明 DBManager的使用方法。
- 創建集合、切換資料庫或集合:
# get DBManger instance
var dbm = DBManager('mongodb://localhost:27017/admin') # db_name, coll_name default 'test'
dbm.create_coll('testDB', 'testCollection')
# change db or coll
dbm.db_name = 'testDB' # dbm.db (test -> testDB) and dbm.coll (test.testCollection-> testDB.testCollection) will be changed at the same time
dbm.coll_nmae = 'testCollection' # dbm.coll (test.test-> test.testCollection) will be change at the same time
- 基本的操作,CRUD:
# simple manipulation operation
dbm.coll.insert_one({'hello': 'world'})
print(dbm.coll.find_one()) # {'_id': ObjectId('...'), 'hello': 'world'}
dbm.coll.update_one({'hello': 'world'}, {'hello': 'hell'})
# bulk operation
from pymongo import InsertOne, DeleteOne, ReplaceOne, ReplaceOne
dbm.coll.bulk_write([InsertOne({'y':1}), DeleteOne({'x':1}), ReplaceOne({{'w':1}, {'z':1}, upsert=True})])
# simple managing operation
import pymongo
dbm.coll.create_index([('hello', pymongo.DESCENDING)], background=True)
dbm.client.list_database_names()
dbm.db.list_collection_names()
- 線程併發,進程並行:
# thread concurrent
import threading
def fun(uri, db_name, coll_name):
# new DBManager instance avoid variable competition
dbm = DBManager(uri, db_name, coll_name)
pass
t = threading.Thread(target=func, args=(uri, db_name, coll_name))
t.start()
# multiprocess parallel
import multiprocessing
def func(uri, db_name, coll_name):
# new process, new client with fork=True parameter, and new DBManager instance.
dbm = DBManager(uri, db_name, coll_name, fork=True)
# Do something with db.
pass
proc = multiprocessing.Process(target=func, args=(uri, db_name, coll_name))
proc.start()
- MapReduce :
# MapReduce
from bson.code import Code
mapper = Code('''
function () {...}
''')
reducer = Code('''
function (key, value) {...}
''')
rst = dbm.coll.inline_map_reduce(mapper, reducer)
對 MongoDB 一致性會話(session)和 事務(transaction)的支持
MongoDB Reference
- docs: https://docs.mongodb.com/manual/
- causal-consistency session: https://docs.mongodb.com/manual/core/read-isolation-consistency-recency/#causal-consistency
- transation: https://docs.mongodb.com/manual/core/transactions/#transactions
會話(session),是對資料庫連接的一種邏輯表示。從MongoDB 3.6開始,MongoDB引入了客戶端會話(client session),併在其中加入了對操作的因果一致性(causal-consistency)的支持。因此,更準確地說,這裡 DBManger 類封裝的其實是因果一致性的會話,即client.start_session(causal_consistency=True)
。不過,一致性能夠保證的前提是客戶端的應用應保證在一個會話中只有一個線程(thread)在做這些操作。在一個客戶端會話中,多個順序的讀寫操作得到的結果與它們的執行順序將是因果一致的,讀寫的設置都自動設為 "majority"。應用場景:先寫後讀,先讀後寫,一致性的寫,一致性的讀(Read your writes,Writes follow reads,Monotonic writes, Monotonic reads)。客戶端會話與服務端會話(server session)進行交互。從3.6版本開始,MongoDB驅動將所有的操作都關聯到服務端會話。服務端會話是客戶端會話順序操作因果一致性和重試寫操作的得以支持的底層框架。
MongoDB 對單個文檔的操作時是原子性的(atomic)。原子性是指一個操作的結果要麼有要麼沒有,不可再切割,換句話說叫 “all or nothing”。從MongoDB 4.0開始,副本集(Replica set)開始支持多個文檔級別的原子性,即多文檔事務(muti-document transaction)。在同一個事務中,對跨越不同資料庫或集合下的多個文檔操作,如果全部操作成功,則該事務被成功提交(commit);如果某些操作出現失敗,則整個事務會終止(abort),操作中對資料庫的改動會被丟棄。只有在事務被成功提交之後,操作的結果才能被事務外看到,事務正在進行或者事務失敗,其中的操作對外都不可見。單個mongod服務和分片集群(sharded cluster)暫不支持事務。MongoDB官方預計在4.2版本左右對分片集群加入對事務的支持。另外,需要註意的是,多文檔事務會引入更大的性能開銷,在場景允許的情況下,儘可能考慮用嵌套文檔或數組的單文檔操作方式來解決問題。
會話和事務的主要應用場景其實都是多個的時序性操作,即流水線形式。因此 DBManager 加入了session_pipeline(self, pipeline)
和 transaction_pipeline(self, pipeline)
的操作方法。首先引入表徵操作的類Operation,描述一個操作作用的層次(client, db或coll)、操作方法、參數和操作結果需要調用的回調函數,見名知意,不再贅解。多個操作 Operation 類的實例構成的list 為pipeline, 作為session_pipeline(self, pipeline)
和 transaction_pipeline(self, pipeline)
的輸入參數。pipeline 操作的每一步的輸出會寫入到對應Operation 類的實例的out屬性中。
class Operation:
"""Operation for constructing sequential pipeline. Only used in DBManager.session_pipeline() or transaction_pipeline().
Constructor parameters:
level: <'client' | 'db' | 'coll'> indicating different operation level, MongoClient, Database, Collection
operation_name: Literally, the name of operation on specific level
args: position arguments the operation need. Require the first parameter or a tuple of parameters of the operation.
kwargs: key word arguments the operation need.
callback: callback function for operation result
Examples:
# pymongo.collection.Collection.find(filter, projection, skip=None, limit=None,...)
Operation('coll', 'find', {'x': 5}) only filter parameter, equivalent to:
Operation('coll', 'find', args={'x': 5}) or Operation('coll', 'find', kwargs={filter: {'x': 5}})
Operation('coll', 'find', ({'x': 5},{'_id': 0}) {'limit':100}), equivalent to:
Operation('coll', 'find', args=({'x': 5},{'_id': 0}, None, {'limit':100}) ), OR
Operation('coll', 'find', kwargs={'filter':{'x': 5}, 'projection': {'_id': 0},'limit':100})
def cursor_callback(cursor):
return cursor.distinct('hello')
Operation('coll', 'find', kwargs={'limit': 2}, callback=cursor_callback)
"""
def __init__(self, level, operation_name, args=(), kwargs={}, callback=None):
self.level = level
self.operation_name = operation_name
self.args = args
if kwargs is None:
self.kwargs = None
else:
self.kwargs = kwargs
self.callback = callback
self.out = None
基於 DBManager 和 Operation 的因果一致性的會話和事務的簡單示例如下:
# causal-consistency session or transaction pipeline operation
def cursor_callback(cursor):
return cursor.distinct('hello')
op_1 = Operation('coll', 'insert_one', {'hello': 'heaven'})
op_2 = Operation('coll', 'insert_one', {'hello': 'hell'})
op_3 = Operation('coll', 'insert_one', {'hello': 'god'})
op_4 = Operation('coll', 'find', kwargs={'limit': 2}, callback=cursor_callback)
op_5 = Operation('coll', 'find_one', {'hello': 'god'})
pipeline = [op_1, op_2, op_3, op_4, op_5]
ops = dbm.transaction_pipeline(pipeline) # only on replica set deployment
# ops = dbm.session_pipeline(pipeline) # can be standalone, replica set or sharded cluster.
for op in ops:
print(op.out)
【正文完】
註:內容同步自同名CSDN博客:https://blog.csdn.net/fzlulee/article/details/85944967