在Flask框架中,實現Token認證機制並不是一件複雜的事情。除了使用官方提供的`flask_httpauth`模塊或者第三方模塊`flask-jwt`,我們還可以考慮自己實現一個簡易版的Token認證工具。自定義Token認證機制的本質是生成一個令牌(Token),併在用戶每次請求時驗證這個令牌... ...
在Flask框架中,實現Token認證機制並不是一件複雜的事情。除了使用官方提供的flask_httpauth
模塊或者第三方模塊flask-jwt
,我們還可以考慮自己實現一個簡易版的Token認證工具。自定義Token認證機制的本質是生成一個令牌(Token),併在用戶每次請求時驗證這個令牌的有效性。
整個過程可以分為以下幾個步驟:
- 用戶登錄時生成Token,並將Token與用戶關聯存儲在伺服器端。
- 用戶在請求時攜帶Token。
- 伺服器在收到請求後,驗證Token的有效性。
- 如果Token有效,允許用戶訪問相應資源;否則,拒絕訪問。
這種自定義的Token認證機制相對簡單,適用於一些小型應用或者對於Token認證機制有特殊需求的場景。搭建這樣一個簡易的認證系統有助於理解Token認證的基本原理,並可以根據實際需求進行靈活的定製。
創建表結構
通過表結構的創建,建立用戶認證和會話管理表。UserAuthDB
表存儲了用戶的賬號密碼信息,而SessionAuthDB
表則存儲了用戶登錄後生成的Token
信息,包括用戶名、Token本身以及Token的過期時間。這為後續實現用戶註冊、登錄以及Token認證等功能提供了資料庫支持。
UserAuthDB表:
- 用途:存儲用戶賬號密碼信息。
- 欄位:
id
: 主鍵,自增,唯一標識每個用戶。username
: 用戶名,非空,唯一,用於登錄時識別用戶。password
: 密碼,非空,用於驗證用戶身份。
SessionAuthDB表:
- 用途:存儲登錄成功後用戶的Token信息。
- 欄位:
id
: 主鍵,自增,唯一標識每個登錄會話。username
: 用戶名,非空,唯一,關聯到UserAuthDB
表的用戶名。token
: 用戶登錄後生成的Token,非空,唯一,用於身份驗證。invalid_date
: Token的過期時間,用於判斷Token是否過期。
代碼通過Flask路由/create
實現了資料庫表結構的創建,主要包括兩張表,分別是UserAuthDB
和SessionAuthDB
。
@app.route("/create",methods=["GET"])
def create():
conn = sqlite3.connect("./database.db")
cursor = conn.cursor()
create_auth = "create table UserAuthDB(" \
"id INTEGER primary key AUTOINCREMENT not null unique," \
"username varchar(64) not null unique," \
"password varchar(64) not null" \
")"
cursor.execute(create_auth)
create_session = "create table SessionAuthDB(" \
"id INTEGER primary key AUTOINCREMENT not null unique," \
"username varchar(64) not null unique," \
"token varchar(128) not null unique," \
"invalid_date int not null" \
")"
cursor.execute(create_session)
conn.commit()
cursor.close()
conn.close()
return "create success"
驗證函數
該驗證函數用於保證傳入的用戶名和密碼滿足一定的安全性和格式要求。通過對長度和字元內容的檢查,確保了傳入的參數不會導致潛在的安全問題。這樣的驗證機制在用戶註冊、登錄等場景中可以有效地防止一些常見的安全漏洞。
參數驗證:
- 接受不定數量的參數
*kwargs
,可傳入多個參數。 - 對於每個傳入的參數,首先驗證其長度是否在合法範圍內(小於128個字元且不為空)。
字元串處理:
- 將參數轉換為小寫形式,然後去除兩側空格,並移除所有空格。
字元內容驗證:
- 遍歷處理後的字元串,檢查其中的字元是否僅包含大寫字母、小寫字母和數字。如果出現其他字元,則認為非法。
返回結果:
- 如果所有參數驗證通過,即長度合法且字元內容符合要求,則返回
True
,表示參數合法。 - 如果有任何一個參數不合法,則返回
False
,表示參數存在非法字元或超出長度限制。
代碼定義了一個名為CheckParameters
的驗證函數,該函數用於驗證傳入的參數是否合法。主要驗證的對象是用戶名和密碼,具體概述如下:
def CheckParameters(*kwargs):
for item in range(len(kwargs)):
# 先驗證長度
if len(kwargs[item]) >= 128 or len(kwargs[item]) == 0:
return False
# 先小寫,然後去掉兩側空格,去掉所有空格
local_string = kwargs[item].lower().strip().replace(" ","")
# 判斷是否只包含 大寫 小寫 數字
for kw in local_string:
if kw.isupper() != True and kw.islower() != True and kw.isdigit() != True:
return False
return True
登錄認證函數
該函數實現了用戶登錄認證的核心邏輯。首先對輸入的用戶名和密碼進行驗證,然後檢查用戶是否存在以及是否已經有生成的Token。如果用戶存在但Token不存在,生成一個新的Token並存入資料庫,最終返回生成的Token。
路由定義:
- 使用
@app.route("/login", methods=["POST"])
定義了一個POST請求的路由,用於處理用戶登錄請求。
參數獲取:
- 通過
request.form.to_dict()
獲取POST請求中的參數,包括用戶名(username)和密碼(password)。
參數驗證:
- 調用之前定義的
CheckParameters
函數對獲取的用戶名和密碼進行合法性驗證,確保其符合安全性和格式要求。
用戶存在性驗證:
- 調用
RunSqlite
函數查詢UserAuthDB
表,驗證用戶名和密碼是否匹配。如果存在匹配的用戶,則繼續執行下一步。
生成Token:
- 查詢
SessionAuthDB
表,檢查是否存在該用戶的Token記錄。如果存在,則直接返回該Token。 - 如果不存在Token記錄,則生成一個32位的隨機Token,並設置過期時間為當前時間戳加上360秒(6分鐘)。
Token寫入資料庫:
- 將生成的Token和過期時間寫入
SessionAuthDB
表。
返回結果:
- 返回生成的Token,作為登錄成功的標識。
@app.route("/login",methods=["POST"])
def login():
if request.method == "POST":
# 獲取參數信息
obtain_dict = request.form.to_dict()
if len(obtain_dict) != 0 and len(obtain_dict) == 2:
username = obtain_dict["username"]
password = obtain_dict["password"]
# 驗證是否合法
is_true = CheckParameters(username,password)
if is_true == True:
# 查詢是否存在該用戶
select = RunSqlite("./database.db", "UserAuthDB", "select", "username,password", f"username='{username}'")
if select[0][0] == username and select[0][1] == password:
# 查詢Session列表是否存在
select_session = RunSqlite("./database.db","SessionAuthDB","select","token",f"username='{username}'")
if select_session != []:
ref = {"message": ""}
ref["message"] = select_session[0][0]
return json.dumps(ref, ensure_ascii=False)
# Session不存在則需要重新生成
else:
# 生成並寫入token和過期時間戳
token = ''.join(random.sample(string.ascii_letters + string.digits, 32))
# 設置360秒周期,過期時間
time_stamp = int(time.time()) + 360
insert = RunSqlite("./database.db", "SessionAuthDB", "insert", "username,token,invalid_date", f"'{username}','{token}',{time_stamp}")
if insert == True:
ref = {"message": ""}
ref["message"] = token
return json.dumps(ref, ensure_ascii=False)
else:
return json.dumps("{'message': '用戶名或密碼錯誤'}", ensure_ascii=False)
else:
return json.dumps("{'message': '輸入參數不可用'}", ensure_ascii=False)
return json.dumps("{'message': '未知錯誤'}", ensure_ascii=False)
登錄認證裝飾器
檢查用戶登錄狀態Token是否過期的裝飾器,裝飾器用於裝飾某一些函數,當主調函數被調用時,會優先執行裝飾器內的代碼,執行後根據裝飾器執行結果返回或退出,裝飾器分為兩種模式,一種是FBV模式,另一種是CBV模式。
FBV(Function-Based Views)和CBV(Class-Based Views)是兩種不同的視圖設計模式,用於處理Web框架中的請求和生成響應。這兩種模式在Django框架中被廣泛使用。
FBV(Function-Based Views)
- 定義: FBV是指使用普通的Python函數來處理請求和生成響應的視圖設計模式。
- 特點:
- 每個視圖對應一個函數,函數接收請求作為參數,返迴響應。
- 簡單,易於理解和使用。
- 視圖的邏輯和處理集中在一個函數中。
示例:
def my_view(request):
# 處理邏輯
return HttpResponse("Hello, World!")
CBV(Class-Based Views)
- 定義: CBV是指使用基於類的Python類來處理請求和生成響應的視圖設計模式。
- 特點:
- 視圖是類,每個類中可以包含多個方法來處理不同HTTP方法(GET、POST等)的請求。
- 提供了更多的代碼組織和復用的可能性,可以使用類的繼承、Mixin等方式。
- 更靈活,適用於複雜的業務邏輯和共用邏輯。
示例:
class MyView(View):
def get(self, request):
# 處理 GET 請求的邏輯
return HttpResponse("Hello, World!")
def post(self, request):
# 處理 POST 請求的邏輯
return HttpResponse("Received a POST request")
FBV與CBV區別
- 結構差異: FBV使用函數,邏輯較為集中;CBV使用類,允許通過類的繼承和Mixin等方式更好地組織代碼。
- 代碼復用: CBV更容易實現代碼復用,可以通過繼承和Mixin在不同的類之間共用邏輯;而FBV需要顯式地將共用邏輯提取為函數。
- 裝飾器: 在FBV中,使用裝飾器來添加額外的功能;而在CBV中,通過類的繼承和Mixin來實現相似的功能。
- 可讀性: 對於簡單的視圖邏輯,FBV可能更直觀易懂;對於較為複雜的業務邏輯,CBV提供了更好的組織和擴展性。
在Flask中,兩種設計模式都可以使用,開發者可以根據項目的需求和個人喜好選擇使用FBV或CBV。
基於FBV的裝飾器設置使用時,需要註意裝飾器嵌入的位置,裝飾器需要在請求進入路由之前,即在請求未走原邏輯代碼的時候介入,對原業務邏輯進行業務拓展。
from flask import Flask, request,render_template
from functools import wraps
app = Flask(__name__)
def login(func):
@wraps(func)
def wrapper(*args, **kwargs):
print("登錄請求: {}".format(request.url))
value = request.form.get("value")
if value == "lyshark":
# 調用原函數,並返回
function_ptr = func(*args, **kwargs)
return function_ptr
else:
return "登錄失敗"
return wrapper
@app.route('/', methods=['GET', 'POST'])
@login
def index():
if request.method == "POST":
value = request.form.get("value")
return "index"
if __name__ == '__main__':
app.run()
而基於CBV的裝飾器設置,使用就顯得更加細分化,可以定製管理專屬功能,在外部定義裝飾器可以全局使用,內部定義可以針對特定路由函數特殊處理。
from flask import Flask, request,render_template,views
from functools import wraps
app = Flask(__name__)
# 裝飾器
def login(func):
@wraps(func)
def wrapper(*args, **kwargs):
print("登錄請求: {}".format(request.url))
value = request.form.get("value")
if value == "lyshark":
# 調用原函數,並返回
function_ptr = func(*args, **kwargs)
return function_ptr
else:
return "登錄失敗"
return wrapper
# 類視圖
class index(views.MethodView):
@login
def get(self):
return request.args
@login
def post(self):
return "success"
# 增加路由
app.add_url_rule(rule='/', view_func=index.as_view('index'))
if __name__ == '__main__':
app.run()
此處為了實現起來更簡單一些此處直接使用FBV模式,我們實現的login_check
裝飾器通過FVB模式構建,代碼中取得用戶的Token以及用戶名對用戶身份進行驗證。
def login_check(func):
@wraps(func)
def wrapper(*args, **kwargs):
print("處理登錄邏輯部分: {}".format(request.url))
# 得到token 驗證是否登陸了,且token沒有過期
local_timestamp = int(time.time())
get_token = request.headers.get("token")
# 驗證傳入參數是否合法
if CheckParameters(get_token) == True:
select = RunSqlite("database.db","SessionAuthDB","select","token,invalid_date",f"token='{get_token}'")
print(select)
# 判斷是否存在記錄,如果存在,在判斷時間戳是否合理
if select != []:
# 如果當前時間與資料庫比對,大於說明過期了需要刪除原來的,讓用戶重新登錄
if local_timestamp >= int(select[0][1]):
print("時間戳過期了")
# 刪除原來的Token
delete = RunSqlite("database.db","SessionAuthDB","delete",f"token='{get_token}'","none")
if delete == True:
return json.dumps("{'token': 'Token 已過期,請重新登錄獲取'}", ensure_ascii=False)
else:
return json.dumps("{'token': '資料庫刪除異常,請聯繫開發者'}", ensure_ascii=False)
else:
# 驗證Token是否一致
if select[0][0] == get_token:
print("Token驗證正常,繼續執行function_ptr指向代碼.")
# 返回到原函數
return func(*args, **kwargs)
else:
print("Token驗證錯誤 {}".format(select))
return json.dumps("{'token': 'Token 傳入錯誤'}", ensure_ascii=False)
# 裝飾器調用原函數
# function_ptr = func(*args, **kwargs)
return json.dumps("{'token': 'Token 驗證失敗'}", ensure_ascii=False)
return wrapper
調用演示
主調用函數則是具體的功能實現可以自定義擴展,當用戶訪問該路由時會優先調用login_check
裝飾器來驗證用戶攜帶Token的合法性,如果合法則會通過return func(*args, **kwargs)
返回執行主調函數,否則直接返回驗證失敗的消息。
# 獲取參數函數
@app.route("/GetPage", methods=["POST"])
@login_check
def GetPage():
if request.method == "POST":
# 獲取參數信息
obtain_dict = request.form.to_dict()
if len(obtain_dict) != 0 and len(obtain_dict) == 1:
pagename = obtain_dict["pagename"]
print("查詢名稱: {}".format(obtain_dict["pagename"]))
# 相應頭的完整寫法
req = Response(response="ok", status=200, mimetype="application/json")
req.headers["Content-Type"] = "text/json; charset=utf-8"
req.headers["Server"] = "LyShark Server 1.0"
req.data = json.dumps("{'message': 'hello world'}")
return req
else:
return json.dumps("{'message': '傳入參數錯誤,請攜帶正確參數請求'}", ensure_ascii=False)
return json.dumps("{'token': '未知錯誤'}", ensure_ascii=False)
# 用戶註冊函數
@app.route("/register", methods=["POST"])
def Register():
if request.method == "POST":
obtain_dict = request.form.to_dict()
if len(obtain_dict) != 0 and len(obtain_dict) == 2:
print("用戶名: {} 密碼: {}".format(obtain_dict["username"], obtain_dict["password"]))
reg_username = obtain_dict["username"]
reg_password = obtain_dict["password"]
# 驗證是否合法
if CheckParameters(reg_username, reg_password) == False:
return json.dumps("{'message': '傳入用戶名密碼不合法'}", ensure_ascii=False)
# 查詢用戶是否存在
select = RunSqlite("database.db","UserAuthDB","select","id",f"username='{reg_username}'")
if select != []:
return json.dumps("{'message': '用戶名已被註冊'}", ensure_ascii=False)
else:
insert = RunSqlite("database.db","UserAuthDB","insert","username,password",f"'{reg_username}','{reg_password}'")
if insert == True:
return json.dumps("{'message': '註冊成功'}", ensure_ascii=False)
else:
return json.dumps("{'message': '註冊失敗'}", ensure_ascii=False)
else:
return json.dumps("{'message': '傳入參數個數不正確'}", ensure_ascii=False)
return json.dumps("{'message': '未知錯誤'}", ensure_ascii=False)
# 密碼修改函數
@app.route("/modify", methods=["POST"])
@login_check
def modify():
if request.method == "POST":
obtain_dict = request.form.to_dict()
if len(obtain_dict) != 0 and len(obtain_dict) == 1:
mdf_password = obtain_dict["password"]
get_token = request.headers.get("token")
print("獲取token: {} 修改後密碼: {}".format(get_token,mdf_password))
# 驗證是否合法
if CheckParameters(get_token, mdf_password) == False:
return json.dumps("{'message': '傳入密碼不合法'}", ensure_ascii=False)
# 先得到token對應用戶名
select = RunSqlite("database.db","SessionAuthDB","select","username",f"token='{get_token}'")
if select != []:
# 接著直接修改密碼即可
modify_username = str(select[0][0])
print("得到的用戶名: {}".format(modify_username))
update = RunSqlite("database.db","UserAuthDB","update",f"username='{modify_username}'",f"password='{mdf_password}'")
if update == True:
# 刪除原來的token,讓用戶重新獲取
delete = RunSqlite("database.db","SessionAuthDB","delete",f"username='{modify_username}'","none")
print("刪除token狀態: {}".format(delete))
return json.dumps("{'message': '修改成功,請重新登錄獲取Token'}", ensure_ascii=False)
else:
return json.dumps("{'message': '修改失敗'}", ensure_ascii=False)
else:
return json.dumps("{'message': '不存在該Token,無法修改密碼'}", ensure_ascii=False)
else:
return json.dumps("{'message': '傳入參數個數不正確'}", ensure_ascii=False)
return json.dumps("{'message': '未知錯誤'}", ensure_ascii=False)
FBV模式下的完整代碼,以下是對代碼的概述:
主要功能:
- 資料庫操作: 封裝了對 SQLite 資料庫的基本增刪改查操作(
RunSqlite
函數)。 - 用戶認證: 提供了用戶登錄、註冊和密碼修改的功能。使用了 Token 機制進行登錄認證,並通過裝飾器
login_check
來驗證 Token 的有效性。 - 創建資料庫表: 提供了一個用於初始化資料庫表結構的介面
/create
。 - 獲取頁面信息: 通過
/GetPage
介面,使用了login_check
裝飾器來驗證用戶登錄狀態,僅對已登錄用戶提供頁面信息。
主要路由
/create
:創建資料庫表結構。/login
:用戶登錄介面,返回用戶的 Token。/GetPage
:獲取頁面信息,需要用戶登錄並攜帶有效 Token。/register
:用戶註冊介面。/modify
:修改用戶密碼介面,需要用戶登錄並攜帶有效 Token。
代碼結構
- 資料庫操作:
- 提供了對 SQLite 資料庫的基本操作,包括插入、更新、查詢和刪除。
- 用戶認證:
- 使用了裝飾器
login_check
對需要登錄的路由進行認證。 - 提供了用戶登錄、註冊和密碼修改的路由。
- 使用了裝飾器
- 創建資料庫表:
- 提供了一個用於初始化資料庫表結構的路由。
- 獲取頁面信息:
- 提供了一個用於獲取頁面信息的路由,需要用戶登錄並攜帶有效 Token。
from flask import Flask,render_template,request,Response,redirect,jsonify
from functools import wraps
import json,sqlite3,random,string,time
app = Flask(__name__)
# 增刪改查簡單封裝
def RunSqlite(db,table,action,field,value):
connect = sqlite3.connect(db)
cursor = connect.cursor()
# 執行插入動作
if action == "insert":
insert = f"insert into {table}({field}) values({value});"
if insert == None or len(insert) == 0:
return False
try:
cursor.execute(insert)
except Exception:
return False
# 執行更新操作
elif action == "update":
update = f"update {table} set {value} where {field};"
if update == None or len(update) == 0:
return False
try:
cursor.execute(update)
except Exception:
return False
# 執行查詢操作
elif action == "select":
# 查詢條件是否為空
if value == "none":
select = f"select {field} from {table};"
else:
select = f"select {field} from {table} where {value};"
try:
ref = cursor.execute(select)
ref_data = ref.fetchall()
connect.commit()
connect.close()
return ref_data
except Exception:
return False
# 執行刪除操作
elif action == "delete":
delete = f"delete from {table} where {field};"
if delete == None or len(delete) == 0:
return False
try:
cursor.execute(delete)
except Exception:
return False
try:
connect.commit()
connect.close()
return True
except Exception:
return False
@app.route("/create",methods=["GET"])
def create():
conn = sqlite3.connect("./database.db")
cursor = conn.cursor()
create_auth = "create table UserAuthDB(" \
"id INTEGER primary key AUTOINCREMENT not null unique," \
"username varchar(64) not null unique," \
"password varchar(64) not null" \
")"
cursor.execute(create_auth)
create_session = "create table SessionAuthDB(" \
"id INTEGER primary key AUTOINCREMENT not null unique," \
"username varchar(64) not null unique," \
"token varchar(128) not null unique," \
"invalid_date int not null" \
")"
cursor.execute(create_session)
conn.commit()
cursor.close()
conn.close()
return "create success"
# 驗證用戶名密碼是否合法
def CheckParameters(*kwargs):
for item in range(len(kwargs)):
# 先驗證長度
if len(kwargs[item]) >= 256 or len(kwargs[item]) == 0:
return False
# 先小寫,然後去掉兩側空格,去掉所有空格
local_string = kwargs[item].lower().strip().replace(" ","")
# 判斷是否只包含 大寫 小寫 數字
for kw in local_string:
if kw.isupper() != True and kw.islower() != True and kw.isdigit() != True:
return False
return True
# 登錄認證模塊
@app.route("/login",methods=["POST"])
def login():
if request.method == "POST":
# 獲取參數信息
obtain_dict = request.form.to_dict()
if len(obtain_dict) != 0 and len(obtain_dict) == 2:
username = obtain_dict["username"]
password = obtain_dict["password"]
# 驗證是否合法
is_true = CheckParameters(username,password)
if is_true == True:
# 查詢是否存在該用戶
select = RunSqlite("./database.db", "UserAuthDB", "select", "username,password", f"username='{username}'")
if select[0][0] == username and select[0][1] == password:
# 查詢Session列表是否存在
select_session = RunSqlite("./database.db","SessionAuthDB","select","token",f"username='{username}'")
if select_session != []:
ref = {"message": ""}
ref["message"] = select_session[0][0]
return json.dumps(ref, ensure_ascii=False)
# Session不存在則需要重新生成
else:
# 生成並寫入token和過期時間戳
token = ''.join(random.sample(string.ascii_letters + string.digits, 32))
# 設置360秒周期,過期時間
time_stamp = int(time.time()) + 360
insert = RunSqlite("./database.db", "SessionAuthDB", "insert", "username,token,invalid_date", f"'{username}','{token}',{time_stamp}")
if insert == True:
ref = {"message": ""}
ref["message"] = token
return json.dumps(ref, ensure_ascii=False)
else:
return json.dumps("{'message': '用戶名或密碼錯誤'}", ensure_ascii=False)
else:
return json.dumps("{'message': '輸入參數不可用'}", ensure_ascii=False)
return json.dumps("{'message': '未知錯誤'}", ensure_ascii=False)
# 檢查登錄狀態 token是否過期的裝飾器
def login_check(func):
@wraps(func)
def wrapper(*args, **kwargs):
print("處理登錄邏輯部分: {}".format(request.url))
# 得到token 驗證是否登陸了,且token沒有過期
local_timestamp = int(time.time())
get_token = request.headers.get("token")
# 驗證傳入參數是否合法
if CheckParameters(get_token) == True:
select = RunSqlite("./database.db","SessionAuthDB","select","token,invalid_date",f"token='{get_token}'")
print(select)
# 判斷是否存在記錄,如果存在,在判斷時間戳是否合理
if select != []:
# 如果當前時間與資料庫比對,大於說明過期了需要刪除原來的,讓用戶重新登錄
if local_timestamp >= int(select[0][1]):
print("時間戳過期了")
# 刪除原來的Token
delete = RunSqlite("./database.db","SessionAuthDB","delete",f"token='{get_token}'","none")
if delete == True:
return json.dumps("{'token': 'Token 已過期,請重新登錄獲取'}", ensure_ascii=False)
else:
return json.dumps("{'token': '資料庫刪除異常,請聯繫開發者'}", ensure_ascii=False)
else:
# 驗證Token是否一致
if select[0][0] == get_token:
print("Token驗證正常,繼續執行function_ptr指向代碼.")
# 返回到原函數
return func(*args, **kwargs)
else:
print("Token驗證錯誤 {}".format(select))
return json.dumps("{'token': 'Token 傳入錯誤'}", ensure_ascii=False)
# 裝飾器調用原函數
# function_ptr = func(*args, **kwargs)
return json.dumps("{'token': 'Token 驗證失敗'}", ensure_ascii=False)
return wrapper
# 獲取參數函數
@app.route("/GetPage", methods=["POST"])
@login_check
def GetPage():
if request.method == "POST":
# 獲取參數信息
obtain_dict = request.form.to_dict()
if len(obtain_dict) != 0 and len(obtain_dict) == 1:
pagename = obtain_dict["pagename"]
print("查詢名稱: {}".format(obtain_dict["pagename"]))
# 相應頭的完整寫法
req = Response(response="ok", status=200, mimetype="application/json")
req.headers["Content-Type"] = "text/json; charset=utf-8"
req.headers["Server"] = "LyShark Server 1.0"
req.data = json.dumps("{'message': 'hello world'}")
return req
else:
return json.dumps("{'message': '傳入參數錯誤,請攜帶正確參數請求'}", ensure_ascii=False)
return json.dumps("{'token': '未知錯誤'}", ensure_ascii=False)
# 用戶註冊函數
@app.route("/register", methods=["POST"])
def Register():
if request.method == "POST":
obtain_dict = request.form.to_dict()
if len(obtain_dict) != 0 and len(obtain_dict) == 2:
print("用戶名: {} 密碼: {}".format(obtain_dict["username"], obtain_dict["password"]))
reg_username = obtain_dict["username"]
reg_password = obtain_dict["password"]
# 驗證是否合法
if CheckParameters(reg_username, reg_password) == False:
return json.dumps("{'message': '傳入用戶名密碼不合法'}", ensure_ascii=False)
# 查詢用戶是否存在
select = RunSqlite("database.db","UserAuthDB","select","id",f"username='{reg_username}'")
if select != []:
return json.dumps("{'message': '用戶名已被註冊'}", ensure_ascii=False)
else:
insert = RunSqlite("database.db","UserAuthDB","insert","username,password",f"'{reg_username}','{reg_password}'")
if insert == True:
return json.dumps("{'message': '註冊成功'}", ensure_ascii=False)
else:
return json.dumps("{'message': '註冊失敗'}", ensure_ascii=False)
else:
return json.dumps("{'message': '傳入參數個數不正確'}", ensure_ascii=False)
return json.dumps("{'message': '未知錯誤'}", ensure_ascii=False)
# 密碼修改函數
@app.route("/modify", methods=["POST"])
@login_check
def modify():
if request.method == "POST":
obtain_dict = request.form.to_dict()
if len(obtain_dict) != 0 and len(obtain_dict) == 1:
mdf_password = obtain_dict["password"]
get_token = request.headers.get("token")
print("獲取token: {} 修改後密碼: {}".format(get_token,mdf_password))
# 驗證是否合法
if CheckParameters(get_token, mdf_password) == False:
return json.dumps("{'message': '傳入密碼不合法'}", ensure_ascii=False)
# 先得到token對應用戶名
select = RunSqlite("./database.db","SessionAuthDB","select","username",f"token='{get_token}'")
if select != []:
# 接著直接修改密碼即可
modify_username = str(select[0][0])
print("得到的用戶名: {}".format(modify_username))
update = RunSqlite("database.db","UserAuthDB","update",f"username='{modify_username}'",f"password='{mdf_password}'")
if update == True:
# 刪除原來的token,讓用戶重新獲取
delete = RunSqlite("./database.db","SessionAuthDB","delete",f"username='{modify_username}'","none")
print("刪除token狀態: {}".format(delete))
return json.dumps("{'message': '修改成功,請重新登錄獲取Token'}", ensure_ascii=False)
else:
return json.dumps("{'message': '修改失敗'}", ensure_ascii=False)
else:
return json.dumps("{'message': '不存在該Token,無法修改密碼'}", ensure_ascii=False)
else:
return json.dumps("{'message': '傳入參數個數不正確'}", ensure_ascii=False)
return json.dumps("{'message': '未知錯誤'}", ensure_ascii=False)
if __name__ == '__main__':
app.run(debug=True)
首先需要在Web頁面訪問http://127.0.0.1/create
路徑實現對資料庫的初始化,並打開Postman
工具,通過傳入參數來使用這個案例。
本博客所有文章除特別聲明外,均採用 BY-NC-SA 許可協議。轉載請註明出處!