之前一篇Python 封裝DBUtils 和pymysql 中寫過一個basedao.py,最近幾天又重新整理了下思緒,優化了下 basedao.py,目前支持的方法還不多,後續會進行改進、添加。 主要功能: 1.查詢單個對象: 所需參數:表名,過濾條件 2.查詢多個對象: 所需參數:表名,過濾條件 ...
之前一篇Python 封裝DBUtils 和pymysql 中寫過一個basedao.py,最近幾天又重新整理了下思緒,優化了下 basedao.py,目前支持的方法還不多,後續會進行改進、添加。
主要功能:
1.查詢單個對象:
所需參數:表名,過濾條件
2.查詢多個對象:
所需參數:表名,過濾條件
3.按主鍵查詢:
所需參數:表名,值
4.分頁查詢:
所需參數:表名,頁碼,每頁記錄數,過濾條件
調用方法鎖獲取的對象都是以字典形式存儲,例如:查詢user表(欄位有id,name,age)里的id=1的數據返回的對象為user = {"id":1,"name","zhangsan","age":18},我們可以通過user.get("id")來獲取id值,非常方便,不用定義什麼類對象來表示。如果查詢的是多個,那麼多個字典對象將會存放在一個列表裡返回。
具體代碼如下:
1 import json, os, sys, time 2 3 import pymysql 4 from DBUtils import PooledDB 5 6 class BaseDao(object): 7 """ 8 簡便的資料庫操作基類 9 """ 10 # 類變數定義在這的時候會出現問題:當程式運行並且實例化了兩個不同的(連接資料庫不同) BaseDao 時, 11 # self.__primaryKey_dict 會出現異常(兩個實例的self.__primaryKey_dict相同)暫不知為何引起這個錯誤。 12 # 我們將在 __init__ 方法中定義類成員。 13 # __config = {} # 資料庫連接配置 14 # __conn = None # 資料庫連接 15 # __cursor = None # 資料庫游標 16 # __database = None # 用於臨時存儲查詢資料庫 17 # __tableName = None # 用於臨時存儲查詢表名 18 # __fields = [] # 用於臨時存儲查詢表的欄位列表 19 # __primaryKey_dict = {} # 用於存儲配置中的資料庫中所有表的主鍵 20 21 def __init__(self, creator=pymysql, host="localhost", user=None, password="", database=None, port=3306, charset="utf8"): 22 if host is None: 23 raise Exception("Parameter [host] is None.") 24 if user is None: 25 raise Exception("Parameter [user] is None.") 26 if password is None: 27 raise Exception("Parameter [password] is None.") 28 if database is None: 29 raise Exception("Parameter [database] is None.") 30 if port is None: 31 raise Exception("Parameter [port] is None.") 32 self.__tableName = None # 用於臨時存儲查詢表名 33 self.__fields = [] # 用於臨時存儲查詢表的欄位列表 34 self.__primaryKey_dict = {} # 用於存儲配置中的資料庫中所有表的主鍵 35 # 資料庫連接配置 36 self.__config = dict({ 37 "creator" : creator, "charset":charset, 38 "host":host, "port":port, 39 "user":user, "password":password, "database":database 40 }) 41 # 資料庫連接 42 self.__conn = PooledDB.connect(**self.__config) 43 # 資料庫游標 44 self.__cursor = self.__conn.cursor() 45 # 用於存儲查詢資料庫 46 self.__database = self.__config["database"] 47 self.__init_primaryKey() 48 print(get_time(), self.__database, "資料庫連接初始化成功。") 49 50 def __del__(self): 51 '重寫類被清除時調用的方法' 52 if self.__cursor: 53 self.__cursor.close() 54 if self.__conn: 55 self.__conn.close() 56 print(get_time(), self.__database, "連接關閉") 57 58 def select_one(self, tableName=None, filters={}): 59 ''' 60 查詢單個對象 61 @tableName 表名 62 @filters 過濾條件 63 @return 返回字典集合,集合中以表欄位作為 key,欄位值作為 value 64 ''' 65 self.__check_params(tableName) 66 sql = self.__query_util(filters) 67 self.__cursor.execute(sql) 68 result = self.__cursor.fetchone() 69 return self.__parse_result(result) 70 71 def select_pk(self, tableName=None, primaryKey=None): 72 ''' 73 按主鍵查詢 74 @tableName 表名 75 @primaryKey 主鍵值 76 ''' 77 self.__check_params(tableName) 78 filters = {} 79 filters.setdefault(str(self.__primaryKey_dict[tableName]), primaryKey) 80 sql = self.__query_util(filters) 81 self.__cursor.execute(sql) 82 result = self.__cursor.fetchone() 83 return self.__parse_result(result) 84 85 def select_all(self, tableName=None, filters={}): 86 ''' 87 查詢所有 88 @tableName 表名 89 @filters 過濾條件 90 @return 返回字典集合,集合中以表欄位作為 key,欄位值作為 value 91 ''' 92 self.__check_params(tableName) 93 sql = self.__query_util(filters) 94 self.__cursor.execute(sql) 95 results = self.__cursor.fetchall() 96 return self.__parse_results(results) 97 98 def count(self, tableName=None): 99 ''' 100 統計記錄數 101 ''' 102 self.__check_params(tableName) 103 sql = "SELECT count(*) FROM %s"%(self.__tableName) 104 self.__cursor.execute(sql) 105 result = self.__cursor.fetchone() 106 return result[0] 107 108 def select_page(self, tableName=None, pageNum=1, limit=10, filters={}): 109 ''' 110 分頁查詢 111 @tableName 表名 112 @return 返回字典集合,集合中以表欄位作為 key,欄位值作為 value 113 ''' 114 self.__check_params(tableName) 115 totalCount = self.count() 116 if totalCount / limit == 0 : 117 totalPage = totalCount / limit 118 else: 119 totalPage = totalCount // limit + 1 120 if pageNum > totalPage: 121 print("最大頁數為%d"%totalPage) 122 pageNum = totalPage 123 elif pageNum < 1: 124 print("頁數不能小於1") 125 pageNum = 1 126 beginindex = (pageNum-1) * limit 127 filters.setdefault("_limit_", (beginindex, limit)) 128 sql = self.__query_util(filters) 129 self.__cursor.execute(sql) 130 results = self.__cursor.fetchall() 131 return self.__parse_results(results) 132 133 def select_database_struts(self): 134 ''' 135 查找當前連接配置中的資料庫結構以字典集合 136 ''' 137 sql = '''SELECT COLUMN_NAME, IS_NULLABLE, COLUMN_TYPE, COLUMN_KEY, COLUMN_COMMENT 138 FROM information_schema.`COLUMNS` 139 WHERE TABLE_SCHEMA="%s" AND TABLE_NAME="{0}" '''%(self.__database) 140 struts = {} 141 for k in self.__primaryKey_dict.keys(): 142 self.__cursor.execute(sql.format(k)) 143 results = self.__cursor.fetchall() 144 struts[k] = {} 145 for result in results: 146 struts[k][result[0]] = {} 147 struts[k][result[0]]["COLUMN_NAME"] = result[0] 148 struts[k][result[0]]["IS_NULLABLE"] = result[1] 149 struts[k][result[0]]["COLUMN_TYPE"] = result[2] 150 struts[k][result[0]]["COLUMN_KEY"] = result[3] 151 struts[k][result[0]]["COLUMN_COMMENT"] = result[4] 152 return self.__config, struts 153 154 def __parse_result(self, result): 155 '用於解析單個查詢結果,返回字典對象' 156 obj = {} 157 for k,v in zip(self.__fields, result): 158 obj[k] = v 159 return obj 160 161 def __parse_results(self, results): 162 '用於解析多個查詢結果,返回字典列表對象' 163 objs = [] 164 for result in results: 165 obj = self.__parse_result(result) 166 objs.append(obj) 167 return objs 168 169 def __init_primaryKey(self): 170 '根據配置中的資料庫讀取該資料庫中所有表的主鍵集合' 171 sql = """SELECT TABLE_NAME, COLUMN_NAME 172 FROM Information_schema.columns 173 WHERE COLUMN_KEY='PRI' AND TABLE_SCHEMA='%s'"""%(self.__database) 174 self.__cursor.execute(sql) 175 results = self.__cursor.fetchall() 176 for result in results: 177 self.__primaryKey_dict[result[0]] = result[1] 178 179 def __query_fields(self, tableName=None, database=None): 180 '查詢表的欄位列表, 將查詢出來的欄位列表存入 __fields 中' 181 sql = """SELECT column_name 182 FROM Information_schema.columns 183 WHERE table_Name = '%s' AND TABLE_SCHEMA='%s'"""%(tableName, database) 184 self.__cursor.execute(sql) 185 fields_tuple = self.__cursor.fetchall() 186 self.__fields = [fields[0] for fields in fields_tuple] 187 188 def __query_util(self, filters=None): 189 """ 190 SQL 語句拼接方法 191 @filters 過濾條件 192 """ 193 sql = r'SELECT #{FIELDS} FROM #{TABLE_NAME} WHERE 1=1 #{FILTERS}' 194 # 拼接查詢表 195 sql = sql.replace("#{TABLE_NAME}", self.__tableName) 196 # 拼接查詢欄位 197 self.__query_fields(self.__tableName, self.__database) 198 FIELDS = "" 199 for field in self.__fields: 200 FIELDS += field + ", " 201 FIELDS = FIELDS[0: len(FIELDS)-2] 202 sql = sql.replace("#{FIELDS}", FIELDS) 203 # 拼接查詢條件(待優化) 204 if filters is None: 205 sql = sql.replace("#{FILTERS}", "") 206 else: 207 FILTERS = "" 208 if not isinstance(filters, dict): 209 raise Exception("Parameter [filters] must be dict type. ") 210 isPage = False 211 if filters.get("_limit_"): 212 isPage = True 213 beginindex, limit = filters.get("_limit_") 214 for k, v in filters.items(): 215 if k.startswith("_in_"): # 拼接 in 216 FILTERS += "AND %s IN (" %(k[4:]) 217 values = v.split(",") 218 for value in values: 219 FILTERS += "%s,"%value 220 FILTERS = FILTERS[0:len(FILTERS)-1] + ") " 221 elif k.startswith("_nein_"): # 拼接 not in 222 FILTERS += "AND %s NOT IN (" %(k[4:]) 223 values = v.split(",") 224 for value in values: 225 FILTERS += "%s,"%value 226 FILTERS = FILTERS[0:len(FILTERS)-1] + ") " 227 elif k.startswith("_like_"): # 拼接 like 228 FILTERS += "AND %s like '%%%s%%' " %(k[6:], v) 229 elif k.startswith("_ne_"): # 拼接不等於 230 FILTERS += "AND %s != '%s' " %(k[4:], v) 231 elif k.startswith("_lt_"): # 拼接小於 232 FILTERS += "AND %s < '%s' " %(k[4:], v) 233 elif k.startswith("_le_"): # 拼接小於等於 234 FILTERS += "AND %s <= '%s' " %(k[4:], v) 235 elif k.startswith("_gt_"): # 拼接大於 236 FILTERS += "AND %s > '%s' " %(k[4:], v) 237 elif k.startswith("_ge_"): # 拼接大於等於 238 FILTERS += "AND %s >= '%s' " %(k[4:], v) 239 elif k in self.__fields: # 拼接等於 240 FILTERS += "AND %s = '%s' "%(k, v) 241 sql = sql.replace("#{FILTERS}", FILTERS) 242 if isPage: 243 sql += "LIMIT %d,%d"%(beginindex, limit) 244 245 print(get_time(), sql) 246 return sql 247 248 def __check_params(self, tableName): 249 ''' 250 檢查參數 251 ''' 252 if tableName: 253 self.__tableName = tableName 254 else: 255 if self.__tableName is None: 256 raise Exception("Parameter [tableName] is None.") 257 258 def get_time(): 259 return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) 260 261 if __name__ == "__main__": 262 config = { 263 # "creator": pymysql, 264 # "host" : "127.0.0.1", 265 "user" : "root", 266 "password" : "root", 267 "database" : "test", 268 # "port" : 3306, 269 # "charset" : 'utf8' 270 } 271 base = BaseDao(**config) 272 ######################################################################## 273 # user = base.select_one("user") 274 # print(user) 275 ######################################################################## 276 # users = base.select_all("user") 277 # print(users) 278 ######################################################################## 279 # filter1 = { 280 # "sex":0, 281 # "_in_id":"1,2,3,4,5", 282 # "_like_name":"zhang", 283 # "_ne_name":"wangwu" 284 # } 285 # user_filters = base.select_all(tableName="user", filters=filter1) 286 # print(user_filters) 287 ######################################################################## 288 # menu = base.select_one(tableName="menu") 289 # print(menu) 290 ######################################################################## 291 # user_pk = base.select_pk("user", 2) 292 # print(user_pk) 293 ######################################################################## 294 # filter2 = { 295 # "_in_id":"1,2,3,4", 296 # "_like_name":"test" 297 # } 298 # user_limit = base.select_page("user", 2, 10, filter2) #未實現 299 # print(user_limit) 300 ########################################################################View Code
代碼中已經給出了幾個具體示例,大家可以參考使用。
如果有感興趣一起學習、討論Python的可以加QQ群:626787819,有啥意見或者建議的可以發我郵箱:[email protected]。