pymsql鏈接資料庫 資料庫連接池版 方式一: 為每個線程創建一個連接,線程即使調用了close方法,也不會關閉,只是把連接重新放到連接池,供自己線程再次使用。當線程終止時,連接自動關閉 方式二: 模式二:創建一批連接到連接池,供所有線程共用使用 setting.py utils/sql.py 使 ...
pymsql鏈接資料庫
import pymysql
conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='123456', db='s8day127db')
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
# cursor.execute("select id,name from users where name=%s and pwd=%s",['lqz','123',])
cursor.execute("select id,name from users where name=%(user)s and pwd=%(pwd)s",{'user':'lqz','pwd':'123'})
obj = cursor.fetchone()
conn.commit()
cursor.close()
conn.close()
print(obj)
資料庫連接池版
方式一:
為每個線程創建一個連接,線程即使調用了close方法,也不會關閉,只是把連接重新放到連接池,供自己線程再次使用。當線程終止時,連接自動關閉
from DBUtils.PersistentDB import PersistentDB
import pymysql
POOL = PersistentDB(
creator=pymysql, # 使用鏈接資料庫的模塊
maxusage=None, # 一個鏈接最多被重覆使用的次數,None表示無限制
setsession=[], # 開始會話前執行的命令列表。
ping=0,
# ping MySQL服務端,檢查是否服務可用。
closeable=False,
# 如果為False時, conn.close() 實際上被忽略,供下次使用,再線程關閉時,才會自動關閉鏈接。如果為True時, conn.close()則關閉鏈接,那麼再次調用pool.connection時就會報錯,因為已經真的關閉了連接(pool.steady_connection()可以獲取一個新的鏈接)
threadlocal=None, # 本線程獨享值得對象,用於保存鏈接對象,如果鏈接對象被重置
host='127.0.0.1',
port=3306,
user='root',
password='123456',
database='test',
charset='utf8'
)
def func():
conn = POOL.connection(shareable=False)
cursor = conn.cursor()
cursor.execute('select * from user')
result = cursor.fetchall()
print(result)
cursor.close()
conn.close()
if __name__ == '__main__':
func()
方式二:
模式二:創建一批連接到連接池,供所有線程共用使用
setting.py
from datetime import timedelta
from redis import Redis
import pymysql
from DBUtils.PooledDB import PooledDB, SharedDBConnection
class Config(object):
DEBUG = True
SECRET_KEY = "umsuldfsdflskjdf"
PERMANENT_SESSION_LIFETIME = timedelta(minutes=20)
SESSION_REFRESH_EACH_REQUEST= True
SESSION_TYPE = "redis"
PYMYSQL_POOL = PooledDB(
creator=pymysql, # 使用鏈接資料庫的模塊
maxconnections=6, # 連接池允許的最大連接數,0和None表示不限制連接數
mincached=2, # 初始化時,鏈接池中至少創建的空閑的鏈接,0表示不創建
maxcached=5, # 鏈接池中最多閑置的鏈接,0和None不限制
maxshared=3,
# 鏈接池中最多共用的鏈接數量,0和None表示全部共用。PS: 無用,因為pymysql和MySQLdb等模塊的 threadsafety都為1,所有值無論設置為多少,_maxcached永遠為0,所以永遠是所有鏈接都共用。
blocking=True, # 連接池中如果沒有可用連接後,是否阻塞等待。True,等待;False,不等待然後報錯
maxusage=None, # 一個鏈接最多被重覆使用的次數,None表示無限制
setsession=[], # 開始會話前執行的命令列表。如:["set datestyle to ...", "set time zone ..."]
ping=0,
# ping MySQL服務端,檢查是否服務可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
host='127.0.0.1',
port=3306,
user='root',
password='123456',
database='s8day127db',
charset='utf8'
)
class ProductionConfig(Config):
SESSION_REDIS = Redis(host='192.168.0.94', port='6379')
class DevelopmentConfig(Config):
SESSION_REDIS = Redis(host='127.0.0.1', port='6379')
class TestingConfig(Config):
pass
utils/sql.py
import pymysql
from settings import Config
class SQLHelper(object):
@staticmethod
def open(cursor):
POOL = Config.PYMYSQL_POOL
conn = POOL.connection()
cursor = conn.cursor(cursor=cursor)
return conn,cursor
@staticmethod
def close(conn,cursor):
conn.commit()
cursor.close()
conn.close()
@classmethod
def fetch_one(cls,sql,args,cursor =pymysql.cursors.DictCursor):
conn,cursor = cls.open(cursor)
cursor.execute(sql, args)
obj = cursor.fetchone()
cls.close(conn,cursor)
return obj
@classmethod
def fetch_all(cls,sql, args,cursor =pymysql.cursors.DictCursor):
conn, cursor = cls.open(cursor)
cursor.execute(sql, args)
obj = cursor.fetchall()
cls.close(conn, cursor)
return obj
使用:
obj = SQLHelper.fetch_one("select id,name from users where name=%(user)s and pwd=%(pwd)s", form.data)