[TOC] 簡介 SQLAlchemy是一個基於Python實現的ORM框架。該框架建立在 DB API之上,使用關係對象映射進行資料庫操作,簡言之便是:將類和對象轉換成SQL,然後使用數據API執行SQL並獲取執行結果。 安裝 組成部分 Engine:框架的引擎 Connection Poolin ...
目錄
簡介
SQLAlchemy是一個基於Python實現的ORM框架。該框架建立在 DB API之上,使用關係對象映射進行資料庫操作,簡言之便是:將類和對象轉換成SQL,然後使用數據API執行SQL並獲取執行結果。
安裝
pip3 install sqlalchemy
組成部分
- Engine:框架的引擎
- Connection Pooling:資料庫連接池
- Dialect:選擇連接資料庫的DB API種類
- Schema/Types:架構和類型
- SQL Exprression Language:SQL表達式語言
SQLAlchemy本身無法操作資料庫,其必須以來pymsql等第三方插件,Dialect用於和數據API進行交流,根據配置文件的不同調用不同的資料庫API,從而實現對資料庫的操作,如:
"""
MySQL-Python
mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname>
pymysql
mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>]
MySQL-Connector
mysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname>
cx_Oracle
oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...]
更多:http://docs.sqlalchemy.org/en/latest/dialects/index.html
"""
django中如何反向生成models
python manage.py inspectdb > app/models.py
簡單使用
SQLAlchemy只能創建表,刪除表,不能在原先的表上在進行修改,如果要進行修改,可以在資料庫進行修改,然後再在對應的類上進行修改
執行原生sql(不常用)
import time
import threading
import sqlalchemy
from sqlalchemy import create_engine
from sqlalchemy.engine.base import Engine
engine = create_engine(
"mysql+pymysql://root:[email protected]:3306/test?charset=utf8",
max_overflow=0, # 超過連接池大小外最多創建的連接
pool_size=5, # 連接池大小
pool_timeout=30, # 池中沒有線程最多等待的時間,否則報錯
pool_recycle=-1 # 多久之後對線程池中的線程進行一次連接的回收(重置)
)
def task(arg):
conn = engine.raw_connection()
cursor = conn.cursor()
cursor.execute(
"select * from app01_book"
)
result = cursor.fetchall()
print(result)
cursor.close()
conn.close()
for i in range(20):
t = threading.Thread(target=task, args=(i,))
t.start()
orm使用(重點)
連接
from sqlalchemy import create_engine
create_engine()
返回一個Engine的實例,並且它表示通過資料庫語法處理細節的核心介面,在這種情況下,資料庫語法將會被解釋稱Python的類方法
engine = create_engine('mysql+pymysql://root:123456@localhost:3306/test',echo=True)
連接 echo參數為True時,會顯示每條執行的sql語句
engine = create_engine('mysql+pymysql://root:123456@localhost:3306/test')
聲明映像
- 通過使用Declarative方法,我們可以創建一些包含描述要被映射的實際資料庫表的準則的映射類。
- 使用Declarative方法定義的映射類依據一個基類,這個基類是維繫類和數據表關係的目錄——我們所說的Declarative base class。在一個普通的模塊入口中,應用通常只需要有一個base的實例。我們通過declarative_base()功能創建一個基類:
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
有了這個Base,我們可以依據這個base定義任意數量的映射類:
class User(Base):
__tablename__ = 'users' # 資料庫表名稱
id = Column(Integer, primary_key=True) # id 主鍵
name = Column(String(32), index=True, nullable=False) # name列,索引,不可為空
# email = Column(String(32), unique=True)
#datetime.datetime.now不能加括弧,加了括弧,以後永遠是當前時間
# ctime = Column(DateTime, default=datetime.datetime.now)
# extra = Column(Text, nullable=True)
__table_args__ = (
# UniqueConstraint('id', 'name', name='uix_id_name'), #聯合唯一
# Index('ix_id_name', 'name', 'email'), #索引
)
註意: 用Declarative 構造的一個類至少需要一個tablename屬性,一個主鍵行。
生成表
SQLAlchemy不能通過類似於與django的makemigerations
和migerate
自動生成表,需要我們自己進行表的生成
def init_db():
"""
根據類創建資料庫表
:return:
"""
engine = create_engine(
"mysql+pymysql://root:[email protected]:3306/aaa?charset=utf8",
max_overflow=0, # 超過連接池大小外最多創建的連接
pool_size=5, # 連接池大小
pool_timeout=30, # 池中沒有線程最多等待的時間,否則報錯
pool_recycle=-1 # 多久之後對線程池中的線程進行一次連接的回收(重置)
)
Base.metadata.create_all(engine)
更改表欄位
SQLAlchemy不支持在表創建完成後,再進行表裡面的欄位進行修改,增加,刪除,所以如果要進行表的欄位修改,有兩種方法:
- 手動修改資料庫,然後再在對應的類上進行欄位的修改
- 刪除表,然後修改欄位後,再創建表
刪除表
def drop_db():
"""
根據類刪除資料庫表
:return:
"""
engine = create_engine(
"mysql+pymysql://root:[email protected]:3306/aaa?charset=utf8",
max_overflow=0, # 超過連接池大小外最多創建的連接
pool_size=5, # 連接池大小
pool_timeout=30, # 池中沒有線程最多等待的時間,否則報錯
pool_recycle=-1 # 多久之後對線程池中的線程進行一次連接的回收(重置)
)
Base.metadata.drop_all(engine)
完整代碼
import datetime
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index
Base = declarative_base()
class Users(Base):
__tablename__ = 'users' # 資料庫表名稱
id = Column(Integer, primary_key=True) # id 主鍵
name = Column(String(32), index=True, nullable=False) # name列,索引,不可為空
age = Column(Integer, default=0)
# email = Column(String(32), unique=True)
#datetime.datetime.now不能加括弧,加了括弧,以後永遠是當前時間
# ctime = Column(DateTime, default=datetime.datetime.now)
# extra = Column(Text, nullable=True)
__table_args__ = (
# UniqueConstraint('id', 'name', name='uix_id_name'), #聯合唯一
# Index('ix_id_name', 'name', 'email'), #索引
)
def init_db():
"""
根據類創建資料庫表
:return:
"""
engine = create_engine(
"mysql+pymysql://root:[email protected]:3306/aaa?charset=utf8",
max_overflow=0, # 超過連接池大小外最多創建的連接
pool_size=5, # 連接池大小
pool_timeout=30, # 池中沒有線程最多等待的時間,否則報錯
pool_recycle=-1 # 多久之後對線程池中的線程進行一次連接的回收(重置)
)
Base.metadata.create_all(engine)
def drop_db():
"""
根據類刪除資料庫表
:return:
"""
engine = create_engine(
"mysql+pymysql://root:[email protected]:3306/aaa?charset=utf8",
max_overflow=0, # 超過連接池大小外最多創建的連接
pool_size=5, # 連接池大小
pool_timeout=30, # 池中沒有線程最多等待的時間,否則報錯
pool_recycle=-1 # 多久之後對線程池中的線程進行一次連接的回收(重置)
)
Base.metadata.drop_all(engine)
if __name__ == '__main__':
# drop_db()
init_db()
常用數據類型
數據類型 | 說明 |
---|---|
Integer | 整形,映射到資料庫中是int類型。 |
Float | 浮點類型,映射到資料庫中是float類型。他占據的32位。 |
Double | 雙精度浮點類型,映射到資料庫中是double類型,占據64位。 |
String | 可變字元類型,映射到資料庫中是varchar類型. |
Boolean | 布爾類型,映射到資料庫中的是tinyint類型。 |
DECIMAL | 定點類型。是專門為瞭解決浮點類型精度丟失的問題的。在存儲錢相關的欄位的時候建議大家都使用這個數據類型。並且這個 類型使用的時候需要傳遞兩個參數,第一個參數是用來標記這個欄位總能能存儲多少個數字,第二個參數表示小數點後有多少位。 |
Enum | 枚舉類型。指定某個欄位只能是枚舉中指定的幾個值,不能為其他值。在ORM模型中,使用Enum來作為枚舉 |
Date | 存儲時間,只能存儲年月日。映射到資料庫中是date類型。在Python代碼中,可以使用datetime.date 來指定 |
DateTime | 存儲時間,可以存儲年月日時分秒毫秒等。映射到資料庫中也是datetime類型。在Python代碼中, 可以使用 datetime.datetime 來指定。 |
Time | 存儲時間,可以存儲時分秒。映射到資料庫中也是time類型。在Python代碼中,可以使用datetime.time 來至此那個。 |
Text | 存儲長字元串。一般可以存儲6W多個字元。如果超出了這個範圍,可以使用LONGTEXT類型。映射到資料庫中就是text類型。 |
LONGTEXT | 長文本類型,映射到資料庫中是longtext類型。 |
Column常用參數
參數 | 詳情 |
---|---|
default | 預設值 |
nullable | 是否為空 |
primary_key | 主鍵 |
unique | 是否唯一 |
autoincrement | 是否自增 |
onupdate | 更新時執行的 |
name | 資料庫映射後的屬性 |
index | 是否建立索引 |
常用操作(CURD)
創建映射類的實例
user1 = User(name='hades', age=18)
user2 = User(name='bonnie', age=16)
創建會話Session
準備好和資料庫會話了,ORM通過Session與資料庫建立連接的
當應用第一次載入時,我們定義一個Session類(聲明Create_engine()的同時),這個Session類為新的Session對象提供工廠服務。
from sqlalchemy.orm import sessionmaker
Session = sessionmaker(bind=engine)
這個定製的Session類會創建綁定到資料庫的Session對象。如果需要和資料庫建立連接,只需要實例化一個session對象
session =Session()
雖然上面的Session已經和資料庫引擎Engine關聯,但是還沒有打開任何連接。當它第一次被使用時,就會從Engine維護的一個連接池中檢索是否存在連接,如果存在便會保持連接知道我們提交所有更改並且/或者關閉session對象。
增加add()/add_all()
# 增加一個
session.add(user1)
session.add(user2)
# 增加多個,可以增加不同的映射實例
# session.add_all([user1, user2, Hosts(ip='127.0.0.1')])
提交commit()
至此,我們可以認為,新添加的這個對象實例還在等待著;user1對象現在並不代表資料庫中的一行數據。直到使用flush進程,Session才會讓SQL保持連接。如果查詢這條數據的話,所有等待信息會被第一時間刷新,查詢結果也會立即發行。
- 通過commit()可以提交所有剩餘的更改到資料庫。
- 註意:提交、查詢都會執行所有的等待信息。
- 所有的增加,修改,刪除都需要commit提交
session.commit()
回滾rollback()
session.rollback()
查詢(重點)
通過Session的query()方法創建一個查詢對象。這個函數的參數數量是可變的,參數可以是任何類或者類的描述集合
下麵是一個迭代輸出User類的例子:
查詢第一個
session.query(Users).filter_by(name='lqz').first()
排序
session.query(User).order_by(User.id).all()
# desc(): 降序,一定要加()
session.query(User).order_by(User.id.desc()).all()
# asc():升序
session.query(User).order_by(Users.name.desc(),User.id.asc()).all()
Query也支持ORM描述作為參數。任何時候,多個類的實體或者是基於列的實體表達都可以作為query()函數的參數,返回類型是元組:
session.query(User.name,User.fullname)
session.query(User,User.name).all()
起別名
- 欄位起別名:
label()
相當於row.name
session.query(User.name.label("name_label")).all()
- 表起別名:
aliased()
from sqlalchemy.orm import aliased
user_alias = aliased(User,name='user_alias')
session.query(user_alias,user_alias.name).all()
Query 的基本操作包括LIMIT和OFFSET,使用python數組切片和ORDERBY結合可以讓操作變得很方便。
限制,用於分頁,區間
只查詢第二條和第三條數據
session.query(User).order_by(User.id)[1:3]
過濾
使用關鍵字變數過濾查詢結果,filter 和filter_by都使用
- filter傳的是表達式,filter_by傳的是參數
session.query(User).filter(User.name=='hades').all()
session.query(User).filter_by(name='bonnie').all()
filter與filter_by的區別:
- filter:可以使用> < 等,但是列必須是: 表.列, filter的等於號是==
- filter:不支持組合查詢
- filter_by: 可以直接寫列,不支持< > filter_by 等於是==
- filter_by 可以支持組合查詢
過濾方法
equals
session.query(User).filter(User.name == 'ed')
not equals
session.query(User).filter(User.name != 'ed')
like
session.query(User).filter(User.name.like('%ed%'))
in
query.filter(User.name.in_(['ed','wendy','jack'])) # 子查詢 session.query(User).filter(User.name.in_(session.query(User.name).filter(User.name.like('%ed%'))
not in
query.filter(~User.name.in_('ed','wendy','jack'))
is null
session.query(User).filter(User.name == None)
is not null
session.query(User).filter(User.name != None)
and
session.query(Users).filter(and_(User.name =='ed',User.fullname =='Ed Jones')) # and session.query(Users).filter(User.name == 'ed',User.fullname =='Ed Jones') # and session.query(Users).filter(User.name == 'ed').filter(User.fullname == 'Ed Jones')# and
or
query.filter(or_(User.name='ed', User.name='wendy'))
占位符查找
#:value 和:name 相當於占位符,用params傳參數 session.query(Users).filter(text("id<:value and name=:name")).params(value=224, name='fred').order_by(Users.id).all()
自定義查詢sql
session.query(Users).from_statement(text("SELECT * FROM users where name=:name")).params(name='ed').all()
統計計數
count = session.query(User).filter(User.name.like("%t%")).count()
分組
session.query(func.count(User.name),User.name).group_by(User.name)
having
having作為分組的篩選條件
session.query(func.min(User.id), func.avg(User.id)).group_by(Users.name).having(func.min(Users.id) >2).all()
聚合函數
func.count
:統計行的數量,和count作用一樣fc=session.query(func.count(User.name),User.name).group_by(User.name).all()
func.avg
:求平均值fc=session.query(func.avg(User.age),User.name).group_by(User.name).all()
func.max
:求最大值fc=session.query(func.max(User.age),User.name).group_by(User.name).all()
func.min
:求最小值fc=session.query(func.min(User.age),User.name).group_by(User.name).all()
func.sum
:求和fc=session.query(func.sum(User.age),User.name).group_by(User.name).all()
修改
第一種:先查詢出對象,然後再賦予對象欄位新的值
obj = session.query(User).filter(User.name=='hades').first() obj.age = 27 session.commit() # 一定要提交
第二種:update()方法,需要傳入一個字典
session.query(User).filter(User.name=='hades').update({'age':27}) session.commit() # 一定要提交
第三種:在原先的基礎上增加,類似於django中的F查詢
比如:年齡加1歲
註意:後面必須配合synchronize_session
- 字元串:
synchronize_session=False
- 數字類型:
synchronize_session=evaluata
session.query(User).filter(User.id > 0).update({User.name: User.name + "099"}, synchronize_session=False) # session.query(User).filter(User.id > 0).update({"age": User.age + 1}, synchronize_session="evaluate") # session.commit()
- 字元串:
刪除delete()
session.query(Users).filter(Users.id > 4).delete()
session.commit()
基於scoped_session實現線程安全
之前我們直接實例化一個sessionmaker
,綁定引擎,獲取Session
對象,然後再實例化,此時會存在一個問題?
場景:如果同時有多個人操作一個表,因為只有全局一個session對象,當某天一個人提交的時候,其他人還沒操作完成,此時就會出現線程安全問題
為瞭解決這個問題,所以就有了scoped_session,將Session
類進行了二次封裝,並不是繼承,但是確擁有Session所有方法。
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session
from models import Users
engine = create_engine("mysql+pymysql://root:[email protected]:3306/s6", max_overflow=0, pool_size=5)
Session = sessionmaker(bind=engine)
"""
# 線程安全,基於本地線程實現每個線程用同一個session
# 特殊的:scoped_session中有原來方法的Session中的一下方法:
public_methods = (
'__contains__', '__iter__', 'add', 'add_all', 'begin', 'begin_nested',
'close', 'commit', 'connection', 'delete', 'execute', 'expire',
'expire_all', 'expunge', 'expunge_all', 'flush', 'get_bind',
'is_modified', 'bulk_save_objects', 'bulk_insert_mappings',
'bulk_update_mappings',
'merge', 'query', 'refresh', 'rollback',
'scalar'
)
"""
#scoped_session類並沒有繼承Session,但是卻又它的所有方法
session = scoped_session(Session)
# ############# 執行ORM操作 #############
obj1 = Users(name="alex1")
session.add(obj1)
# 提交事務
session.commit()
# 關閉session
session.close()