Flask 實現Token認證機制

来源:https://www.cnblogs.com/LyShark/archive/2023/11/27/17859045.html
-Advertisement-
Play Games

在Flask框架中,實現Token認證機制並不是一件複雜的事情。除了使用官方提供的`flask_httpauth`模塊或者第三方模塊`flask-jwt`,我們還可以考慮自己實現一個簡易版的Token認證工具。自定義Token認證機制的本質是生成一個令牌(Token),併在用戶每次請求時驗證這個令牌... ...


在Flask框架中,實現Token認證機制並不是一件複雜的事情。除了使用官方提供的flask_httpauth模塊或者第三方模塊flask-jwt,我們還可以考慮自己實現一個簡易版的Token認證工具。自定義Token認證機制的本質是生成一個令牌(Token),併在用戶每次請求時驗證這個令牌的有效性。

整個過程可以分為以下幾個步驟:

  1. 用戶登錄時生成Token,並將Token與用戶關聯存儲在伺服器端。
  2. 用戶在請求時攜帶Token。
  3. 伺服器在收到請求後,驗證Token的有效性。
  4. 如果Token有效,允許用戶訪問相應資源;否則,拒絕訪問。

這種自定義的Token認證機制相對簡單,適用於一些小型應用或者對於Token認證機制有特殊需求的場景。搭建這樣一個簡易的認證系統有助於理解Token認證的基本原理,並可以根據實際需求進行靈活的定製。

創建表結構

通過表結構的創建,建立用戶認證和會話管理表。UserAuthDB表存儲了用戶的賬號密碼信息,而SessionAuthDB表則存儲了用戶登錄後生成的Token信息,包括用戶名、Token本身以及Token的過期時間。這為後續實現用戶註冊、登錄以及Token認證等功能提供了資料庫支持。

UserAuthDB表:

  • 用途:存儲用戶賬號密碼信息。
  • 欄位:
    • id: 主鍵,自增,唯一標識每個用戶。
    • username: 用戶名,非空,唯一,用於登錄時識別用戶。
    • password: 密碼,非空,用於驗證用戶身份。

SessionAuthDB表:

  • 用途:存儲登錄成功後用戶的Token信息。
  • 欄位:
    • id: 主鍵,自增,唯一標識每個登錄會話。
    • username: 用戶名,非空,唯一,關聯到UserAuthDB表的用戶名。
    • token: 用戶登錄後生成的Token,非空,唯一,用於身份驗證。
    • invalid_date: Token的過期時間,用於判斷Token是否過期。

代碼通過Flask路由/create實現了資料庫表結構的創建,主要包括兩張表,分別是UserAuthDBSessionAuthDB

@app.route("/create",methods=["GET"])
def create():
    conn = sqlite3.connect("./database.db")
    cursor = conn.cursor()
    create_auth = "create table UserAuthDB(" \
             "id INTEGER primary key AUTOINCREMENT not null unique," \
             "username varchar(64) not null unique," \
             "password varchar(64) not null" \
             ")"
    cursor.execute(create_auth)

    create_session = "create table SessionAuthDB(" \
                     "id INTEGER primary key AUTOINCREMENT not null unique," \
                     "username varchar(64) not null unique," \
                     "token varchar(128) not null unique," \
                     "invalid_date int not null" \
                     ")"

    cursor.execute(create_session)
    conn.commit()
    cursor.close()
    conn.close()
    return "create success"

驗證函數

該驗證函數用於保證傳入的用戶名和密碼滿足一定的安全性和格式要求。通過對長度和字元內容的檢查,確保了傳入的參數不會導致潛在的安全問題。這樣的驗證機制在用戶註冊、登錄等場景中可以有效地防止一些常見的安全漏洞。

參數驗證:

  • 接受不定數量的參數*kwargs,可傳入多個參數。
  • 對於每個傳入的參數,首先驗證其長度是否在合法範圍內(小於128個字元且不為空)。

字元串處理:

  • 將參數轉換為小寫形式,然後去除兩側空格,並移除所有空格。

字元內容驗證:

  • 遍歷處理後的字元串,檢查其中的字元是否僅包含大寫字母、小寫字母和數字。如果出現其他字元,則認為非法。

返回結果:

  • 如果所有參數驗證通過,即長度合法且字元內容符合要求,則返回True,表示參數合法。
  • 如果有任何一個參數不合法,則返回False,表示參數存在非法字元或超出長度限制。

代碼定義了一個名為CheckParameters的驗證函數,該函數用於驗證傳入的參數是否合法。主要驗證的對象是用戶名和密碼,具體概述如下:

def CheckParameters(*kwargs):
    for item in range(len(kwargs)):
        # 先驗證長度
        if len(kwargs[item]) >= 128 or len(kwargs[item]) == 0:
            return False

        # 先小寫,然後去掉兩側空格,去掉所有空格
        local_string = kwargs[item].lower().strip().replace(" ","")

        # 判斷是否只包含 大寫 小寫 數字
        for kw in local_string:
            if kw.isupper() != True and kw.islower() != True and kw.isdigit() != True:
                return False
    return True

登錄認證函數

該函數實現了用戶登錄認證的核心邏輯。首先對輸入的用戶名和密碼進行驗證,然後檢查用戶是否存在以及是否已經有生成的Token。如果用戶存在但Token不存在,生成一個新的Token並存入資料庫,最終返回生成的Token。

路由定義:

  • 使用@app.route("/login", methods=["POST"])定義了一個POST請求的路由,用於處理用戶登錄請求。

參數獲取:

  • 通過request.form.to_dict()獲取POST請求中的參數,包括用戶名(username)和密碼(password)。

參數驗證:

  • 調用之前定義的CheckParameters函數對獲取的用戶名和密碼進行合法性驗證,確保其符合安全性和格式要求。

用戶存在性驗證:

  • 調用RunSqlite函數查詢UserAuthDB表,驗證用戶名和密碼是否匹配。如果存在匹配的用戶,則繼續執行下一步。

生成Token:

  • 查詢SessionAuthDB表,檢查是否存在該用戶的Token記錄。如果存在,則直接返回該Token。
  • 如果不存在Token記錄,則生成一個32位的隨機Token,並設置過期時間為當前時間戳加上360秒(6分鐘)。

Token寫入資料庫:

  • 將生成的Token和過期時間寫入SessionAuthDB表。

返回結果:

  • 返回生成的Token,作為登錄成功的標識。
@app.route("/login",methods=["POST"])
def login():
    if request.method == "POST":
        # 獲取參數信息
        obtain_dict = request.form.to_dict()
        if len(obtain_dict) != 0 and len(obtain_dict) == 2:
            username = obtain_dict["username"]
            password = obtain_dict["password"]

            # 驗證是否合法
            is_true = CheckParameters(username,password)
            if is_true == True:

                # 查詢是否存在該用戶
                select = RunSqlite("./database.db", "UserAuthDB", "select", "username,password", f"username='{username}'")
                if select[0][0] == username and select[0][1] == password:
                    # 查詢Session列表是否存在
                    select_session = RunSqlite("./database.db","SessionAuthDB","select","token",f"username='{username}'")
                    if select_session != []:
                        ref = {"message": ""}
                        ref["message"] = select_session[0][0]
                        return json.dumps(ref, ensure_ascii=False)

                    # Session不存在則需要重新生成
                    else:
                        # 生成並寫入token和過期時間戳
                        token = ''.join(random.sample(string.ascii_letters + string.digits, 32))

                        # 設置360秒周期,過期時間
                        time_stamp = int(time.time()) + 360

                        insert = RunSqlite("./database.db", "SessionAuthDB", "insert", "username,token,invalid_date", f"'{username}','{token}',{time_stamp}")
                        if insert == True:
                            ref = {"message": ""}
                            ref["message"] = token
                            return json.dumps(ref, ensure_ascii=False)
                else:
                    return json.dumps("{'message': '用戶名或密碼錯誤'}", ensure_ascii=False)
            else:
                return json.dumps("{'message': '輸入參數不可用'}", ensure_ascii=False)

    return json.dumps("{'message': '未知錯誤'}", ensure_ascii=False)

登錄認證裝飾器

檢查用戶登錄狀態Token是否過期的裝飾器,裝飾器用於裝飾某一些函數,當主調函數被調用時,會優先執行裝飾器內的代碼,執行後根據裝飾器執行結果返回或退出,裝飾器分為兩種模式,一種是FBV模式,另一種是CBV模式。

FBV(Function-Based Views)和CBV(Class-Based Views)是兩種不同的視圖設計模式,用於處理Web框架中的請求和生成響應。這兩種模式在Django框架中被廣泛使用。

FBV(Function-Based Views)

  • 定義: FBV是指使用普通的Python函數來處理請求和生成響應的視圖設計模式。
  • 特點:
    • 每個視圖對應一個函數,函數接收請求作為參數,返迴響應。
    • 簡單,易於理解和使用。
    • 視圖的邏輯和處理集中在一個函數中。

示例:

def my_view(request):
    # 處理邏輯
    return HttpResponse("Hello, World!")

CBV(Class-Based Views)

  • 定義: CBV是指使用基於類的Python類來處理請求和生成響應的視圖設計模式。
  • 特點:
    • 視圖是類,每個類中可以包含多個方法來處理不同HTTP方法(GET、POST等)的請求。
    • 提供了更多的代碼組織和復用的可能性,可以使用類的繼承、Mixin等方式。
    • 更靈活,適用於複雜的業務邏輯和共用邏輯。

示例:

class MyView(View):
    def get(self, request):
        # 處理 GET 請求的邏輯
        return HttpResponse("Hello, World!")

    def post(self, request):
        # 處理 POST 請求的邏輯
        return HttpResponse("Received a POST request")

FBV與CBV區別

  1. 結構差異: FBV使用函數,邏輯較為集中;CBV使用類,允許通過類的繼承和Mixin等方式更好地組織代碼。
  2. 代碼復用: CBV更容易實現代碼復用,可以通過繼承和Mixin在不同的類之間共用邏輯;而FBV需要顯式地將共用邏輯提取為函數。
  3. 裝飾器: 在FBV中,使用裝飾器來添加額外的功能;而在CBV中,通過類的繼承和Mixin來實現相似的功能。
  4. 可讀性: 對於簡單的視圖邏輯,FBV可能更直觀易懂;對於較為複雜的業務邏輯,CBV提供了更好的組織和擴展性。

在Flask中,兩種設計模式都可以使用,開發者可以根據項目的需求和個人喜好選擇使用FBV或CBV。

基於FBV的裝飾器設置使用時,需要註意裝飾器嵌入的位置,裝飾器需要在請求進入路由之前,即在請求未走原邏輯代碼的時候介入,對原業務邏輯進行業務拓展。

from flask import Flask, request,render_template
from functools import wraps

app = Flask(__name__)

def login(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        print("登錄請求: {}".format(request.url))

        value = request.form.get("value")
        if value == "lyshark":
            # 調用原函數,並返回
            function_ptr = func(*args, **kwargs)
            return function_ptr
        else:
            return "登錄失敗"
    return wrapper

@app.route('/', methods=['GET', 'POST'])
@login
def index():
    if request.method == "POST":
        value = request.form.get("value")
    return "index"

if __name__ == '__main__':
    app.run()

而基於CBV的裝飾器設置,使用就顯得更加細分化,可以定製管理專屬功能,在外部定義裝飾器可以全局使用,內部定義可以針對特定路由函數特殊處理。

from flask import Flask, request,render_template,views
from functools import wraps

app = Flask(__name__)

# 裝飾器
def login(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        print("登錄請求: {}".format(request.url))

        value = request.form.get("value")
        if value == "lyshark":
            # 調用原函數,並返回
            function_ptr = func(*args, **kwargs)
            return function_ptr
        else:
            return "登錄失敗"
    return wrapper

# 類視圖
class index(views.MethodView):
    @login
    def get(self):
        return request.args

    @login
    def post(self):
        return "success"

# 增加路由
app.add_url_rule(rule='/', view_func=index.as_view('index'))

if __name__ == '__main__':
    app.run()

此處為了實現起來更簡單一些此處直接使用FBV模式,我們實現的login_check裝飾器通過FVB模式構建,代碼中取得用戶的Token以及用戶名對用戶身份進行驗證。

def login_check(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        print("處理登錄邏輯部分: {}".format(request.url))

        # 得到token 驗證是否登陸了,且token沒有過期
        local_timestamp = int(time.time())
        get_token = request.headers.get("token")

        # 驗證傳入參數是否合法
        if CheckParameters(get_token) == True:
            select = RunSqlite("database.db","SessionAuthDB","select","token,invalid_date",f"token='{get_token}'")
            print(select)
            # 判斷是否存在記錄,如果存在,在判斷時間戳是否合理
            if select != []:
                # 如果當前時間與資料庫比對,大於說明過期了需要刪除原來的,讓用戶重新登錄
                if local_timestamp >= int(select[0][1]):
                    print("時間戳過期了")
                    # 刪除原來的Token
                    delete = RunSqlite("database.db","SessionAuthDB","delete",f"token='{get_token}'","none")
                    if delete == True:
                        return json.dumps("{'token': 'Token 已過期,請重新登錄獲取'}", ensure_ascii=False)
                    else:
                        return json.dumps("{'token': '資料庫刪除異常,請聯繫開發者'}", ensure_ascii=False)
                else:
                    # 驗證Token是否一致
                    if select[0][0] == get_token:
                        print("Token驗證正常,繼續執行function_ptr指向代碼.")
                        # 返回到原函數
                        return func(*args, **kwargs)
                    else:
                        print("Token驗證錯誤 {}".format(select))
                        return json.dumps("{'token': 'Token 傳入錯誤'}", ensure_ascii=False)

            # 裝飾器調用原函數
            # function_ptr = func(*args, **kwargs)

        return json.dumps("{'token': 'Token 驗證失敗'}", ensure_ascii=False)
    return wrapper

調用演示

主調用函數則是具體的功能實現可以自定義擴展,當用戶訪問該路由時會優先調用login_check裝飾器來驗證用戶攜帶Token的合法性,如果合法則會通過return func(*args, **kwargs)返回執行主調函數,否則直接返回驗證失敗的消息。

# 獲取參數函數
@app.route("/GetPage", methods=["POST"])
@login_check
def GetPage():
    if request.method == "POST":
        # 獲取參數信息
        obtain_dict = request.form.to_dict()
        if len(obtain_dict) != 0 and len(obtain_dict) == 1:

            pagename = obtain_dict["pagename"]
            print("查詢名稱: {}".format(obtain_dict["pagename"]))

            # 相應頭的完整寫法
            req = Response(response="ok", status=200, mimetype="application/json")
            req.headers["Content-Type"] = "text/json; charset=utf-8"
            req.headers["Server"] = "LyShark Server 1.0"
            req.data = json.dumps("{'message': 'hello world'}")
            return req
        else:
            return json.dumps("{'message': '傳入參數錯誤,請攜帶正確參數請求'}", ensure_ascii=False)
    return json.dumps("{'token': '未知錯誤'}", ensure_ascii=False)

# 用戶註冊函數
@app.route("/register", methods=["POST"])
def Register():
    if request.method == "POST":
        obtain_dict = request.form.to_dict()
        if len(obtain_dict) != 0 and len(obtain_dict) == 2:

            print("用戶名: {} 密碼: {}".format(obtain_dict["username"], obtain_dict["password"]))
            reg_username = obtain_dict["username"]
            reg_password = obtain_dict["password"]

            # 驗證是否合法
            if CheckParameters(reg_username, reg_password) == False:
                return json.dumps("{'message': '傳入用戶名密碼不合法'}", ensure_ascii=False)

            # 查詢用戶是否存在
            select = RunSqlite("database.db","UserAuthDB","select","id",f"username='{reg_username}'")
            if select != []:
                return json.dumps("{'message': '用戶名已被註冊'}", ensure_ascii=False)
            else:
                insert = RunSqlite("database.db","UserAuthDB","insert","username,password",f"'{reg_username}','{reg_password}'")
                if insert == True:
                    return json.dumps("{'message': '註冊成功'}", ensure_ascii=False)
                else:
                    return json.dumps("{'message': '註冊失敗'}", ensure_ascii=False)
        else:
            return json.dumps("{'message': '傳入參數個數不正確'}", ensure_ascii=False)
    return json.dumps("{'message': '未知錯誤'}", ensure_ascii=False)

# 密碼修改函數
@app.route("/modify", methods=["POST"])
@login_check
def modify():
    if request.method == "POST":
        obtain_dict = request.form.to_dict()
        if len(obtain_dict) != 0 and len(obtain_dict) == 1:

            mdf_password = obtain_dict["password"]
            get_token = request.headers.get("token")
            print("獲取token: {} 修改後密碼: {}".format(get_token,mdf_password))

            # 驗證是否合法
            if CheckParameters(get_token, mdf_password) == False:
                return json.dumps("{'message': '傳入密碼不合法'}", ensure_ascii=False)

            # 先得到token對應用戶名
            select = RunSqlite("database.db","SessionAuthDB","select","username",f"token='{get_token}'")
            if select != []:
                # 接著直接修改密碼即可
                modify_username = str(select[0][0])
                print("得到的用戶名: {}".format(modify_username))
                update = RunSqlite("database.db","UserAuthDB","update",f"username='{modify_username}'",f"password='{mdf_password}'")
                if update == True:
                    # 刪除原來的token,讓用戶重新獲取
                    delete = RunSqlite("database.db","SessionAuthDB","delete",f"username='{modify_username}'","none")
                    print("刪除token狀態: {}".format(delete))
                    return json.dumps("{'message': '修改成功,請重新登錄獲取Token'}", ensure_ascii=False)

                else:
                    return json.dumps("{'message': '修改失敗'}", ensure_ascii=False)
            else:
                return json.dumps("{'message': '不存在該Token,無法修改密碼'}", ensure_ascii=False)
        else:
            return json.dumps("{'message': '傳入參數個數不正確'}", ensure_ascii=False)
    return json.dumps("{'message': '未知錯誤'}", ensure_ascii=False)

FBV模式下的完整代碼,以下是對代碼的概述:

主要功能:

  1. 資料庫操作: 封裝了對 SQLite 資料庫的基本增刪改查操作(RunSqlite 函數)。
  2. 用戶認證: 提供了用戶登錄、註冊和密碼修改的功能。使用了 Token 機制進行登錄認證,並通過裝飾器 login_check 來驗證 Token 的有效性。
  3. 創建資料庫表: 提供了一個用於初始化資料庫表結構的介面 /create
  4. 獲取頁面信息: 通過 /GetPage 介面,使用了 login_check 裝飾器來驗證用戶登錄狀態,僅對已登錄用戶提供頁面信息。

主要路由

  • /create:創建資料庫表結構。
  • /login:用戶登錄介面,返回用戶的 Token。
  • /GetPage:獲取頁面信息,需要用戶登錄並攜帶有效 Token。
  • /register:用戶註冊介面。
  • /modify:修改用戶密碼介面,需要用戶登錄並攜帶有效 Token。

代碼結構

  1. 資料庫操作:
    • 提供了對 SQLite 資料庫的基本操作,包括插入、更新、查詢和刪除。
  2. 用戶認證:
    • 使用了裝飾器 login_check 對需要登錄的路由進行認證。
    • 提供了用戶登錄、註冊和密碼修改的路由。
  3. 創建資料庫表:
    • 提供了一個用於初始化資料庫表結構的路由。
  4. 獲取頁面信息:
    • 提供了一個用於獲取頁面信息的路由,需要用戶登錄並攜帶有效 Token。
from flask import Flask,render_template,request,Response,redirect,jsonify
from functools import wraps
import json,sqlite3,random,string,time

app = Flask(__name__)

# 增刪改查簡單封裝
def RunSqlite(db,table,action,field,value):
    connect = sqlite3.connect(db)
    cursor = connect.cursor()

    # 執行插入動作
    if action == "insert":
        insert = f"insert into {table}({field}) values({value});"
        if insert == None or len(insert) == 0:
            return False
        try:
            cursor.execute(insert)
        except Exception:
            return False

    # 執行更新操作
    elif action == "update":
        update = f"update {table} set {value} where {field};"
        if update == None or len(update) == 0:
            return False
        try:
            cursor.execute(update)
        except Exception:
            return False

    # 執行查詢操作
    elif action == "select":

        # 查詢條件是否為空
        if value == "none":
            select = f"select {field} from {table};"
        else:
            select = f"select {field} from {table} where {value};"

        try:
            ref = cursor.execute(select)
            ref_data = ref.fetchall()
            connect.commit()
            connect.close()
            return ref_data
        except Exception:
            return False

    # 執行刪除操作
    elif action == "delete":
        delete = f"delete from {table} where {field};"
        if delete == None or len(delete) == 0:
            return False
        try:
            cursor.execute(delete)
        except Exception:
            return False
    try:
        connect.commit()
        connect.close()
        return True
    except Exception:
        return False

@app.route("/create",methods=["GET"])
def create():
    conn = sqlite3.connect("./database.db")
    cursor = conn.cursor()
    create_auth = "create table UserAuthDB(" \
             "id INTEGER primary key AUTOINCREMENT not null unique," \
             "username varchar(64) not null unique," \
             "password varchar(64) not null" \
             ")"
    cursor.execute(create_auth)

    create_session = "create table SessionAuthDB(" \
                     "id INTEGER primary key AUTOINCREMENT not null unique," \
                     "username varchar(64) not null unique," \
                     "token varchar(128) not null unique," \
                     "invalid_date int not null" \
                     ")"

    cursor.execute(create_session)
    conn.commit()
    cursor.close()
    conn.close()
    return "create success"

# 驗證用戶名密碼是否合法
def CheckParameters(*kwargs):
    for item in range(len(kwargs)):
        # 先驗證長度
        if len(kwargs[item]) >= 256 or len(kwargs[item]) == 0:
            return False

        # 先小寫,然後去掉兩側空格,去掉所有空格
        local_string = kwargs[item].lower().strip().replace(" ","")

        # 判斷是否只包含 大寫 小寫 數字
        for kw in local_string:
            if kw.isupper() != True and kw.islower() != True and kw.isdigit() != True:
                return False
    return True

# 登錄認證模塊
@app.route("/login",methods=["POST"])
def login():
    if request.method == "POST":
        # 獲取參數信息
        obtain_dict = request.form.to_dict()
        if len(obtain_dict) != 0 and len(obtain_dict) == 2:
            username = obtain_dict["username"]
            password = obtain_dict["password"]

            # 驗證是否合法
            is_true = CheckParameters(username,password)
            if is_true == True:

                # 查詢是否存在該用戶
                select = RunSqlite("./database.db", "UserAuthDB", "select", "username,password", f"username='{username}'")
                if select[0][0] == username and select[0][1] == password:
                    # 查詢Session列表是否存在
                    select_session = RunSqlite("./database.db","SessionAuthDB","select","token",f"username='{username}'")
                    if select_session != []:
                        ref = {"message": ""}
                        ref["message"] = select_session[0][0]
                        return json.dumps(ref, ensure_ascii=False)

                    # Session不存在則需要重新生成
                    else:
                        # 生成並寫入token和過期時間戳
                        token = ''.join(random.sample(string.ascii_letters + string.digits, 32))

                        # 設置360秒周期,過期時間
                        time_stamp = int(time.time()) + 360

                        insert = RunSqlite("./database.db", "SessionAuthDB", "insert", "username,token,invalid_date", f"'{username}','{token}',{time_stamp}")
                        if insert == True:
                            ref = {"message": ""}
                            ref["message"] = token
                            return json.dumps(ref, ensure_ascii=False)
                else:
                    return json.dumps("{'message': '用戶名或密碼錯誤'}", ensure_ascii=False)
            else:
                return json.dumps("{'message': '輸入參數不可用'}", ensure_ascii=False)

    return json.dumps("{'message': '未知錯誤'}", ensure_ascii=False)

# 檢查登錄狀態 token是否過期的裝飾器
def login_check(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        print("處理登錄邏輯部分: {}".format(request.url))

        # 得到token 驗證是否登陸了,且token沒有過期
        local_timestamp = int(time.time())
        get_token = request.headers.get("token")

        # 驗證傳入參數是否合法
        if CheckParameters(get_token) == True:
            select = RunSqlite("./database.db","SessionAuthDB","select","token,invalid_date",f"token='{get_token}'")
            print(select)
            # 判斷是否存在記錄,如果存在,在判斷時間戳是否合理
            if select != []:
                # 如果當前時間與資料庫比對,大於說明過期了需要刪除原來的,讓用戶重新登錄
                if local_timestamp >= int(select[0][1]):
                    print("時間戳過期了")
                    # 刪除原來的Token
                    delete = RunSqlite("./database.db","SessionAuthDB","delete",f"token='{get_token}'","none")
                    if delete == True:
                        return json.dumps("{'token': 'Token 已過期,請重新登錄獲取'}", ensure_ascii=False)
                    else:
                        return json.dumps("{'token': '資料庫刪除異常,請聯繫開發者'}", ensure_ascii=False)
                else:
                    # 驗證Token是否一致
                    if select[0][0] == get_token:
                        print("Token驗證正常,繼續執行function_ptr指向代碼.")
                        # 返回到原函數
                        return func(*args, **kwargs)
                    else:
                        print("Token驗證錯誤 {}".format(select))
                        return json.dumps("{'token': 'Token 傳入錯誤'}", ensure_ascii=False)

            # 裝飾器調用原函數
            # function_ptr = func(*args, **kwargs)

        return json.dumps("{'token': 'Token 驗證失敗'}", ensure_ascii=False)
    return wrapper

# 獲取參數函數
@app.route("/GetPage", methods=["POST"])
@login_check
def GetPage():
    if request.method == "POST":
        # 獲取參數信息
        obtain_dict = request.form.to_dict()
        if len(obtain_dict) != 0 and len(obtain_dict) == 1:

            pagename = obtain_dict["pagename"]
            print("查詢名稱: {}".format(obtain_dict["pagename"]))

            # 相應頭的完整寫法
            req = Response(response="ok", status=200, mimetype="application/json")
            req.headers["Content-Type"] = "text/json; charset=utf-8"
            req.headers["Server"] = "LyShark Server 1.0"
            req.data = json.dumps("{'message': 'hello world'}")
            return req
        else:
            return json.dumps("{'message': '傳入參數錯誤,請攜帶正確參數請求'}", ensure_ascii=False)
    return json.dumps("{'token': '未知錯誤'}", ensure_ascii=False)

# 用戶註冊函數
@app.route("/register", methods=["POST"])
def Register():
    if request.method == "POST":
        obtain_dict = request.form.to_dict()
        if len(obtain_dict) != 0 and len(obtain_dict) == 2:

            print("用戶名: {} 密碼: {}".format(obtain_dict["username"], obtain_dict["password"]))
            reg_username = obtain_dict["username"]
            reg_password = obtain_dict["password"]

            # 驗證是否合法
            if CheckParameters(reg_username, reg_password) == False:
                return json.dumps("{'message': '傳入用戶名密碼不合法'}", ensure_ascii=False)

            # 查詢用戶是否存在
            select = RunSqlite("database.db","UserAuthDB","select","id",f"username='{reg_username}'")
            if select != []:
                return json.dumps("{'message': '用戶名已被註冊'}", ensure_ascii=False)
            else:
                insert = RunSqlite("database.db","UserAuthDB","insert","username,password",f"'{reg_username}','{reg_password}'")
                if insert == True:
                    return json.dumps("{'message': '註冊成功'}", ensure_ascii=False)
                else:
                    return json.dumps("{'message': '註冊失敗'}", ensure_ascii=False)
        else:
            return json.dumps("{'message': '傳入參數個數不正確'}", ensure_ascii=False)
    return json.dumps("{'message': '未知錯誤'}", ensure_ascii=False)

# 密碼修改函數
@app.route("/modify", methods=["POST"])
@login_check
def modify():
    if request.method == "POST":
        obtain_dict = request.form.to_dict()
        if len(obtain_dict) != 0 and len(obtain_dict) == 1:

            mdf_password = obtain_dict["password"]
            get_token = request.headers.get("token")
            print("獲取token: {} 修改後密碼: {}".format(get_token,mdf_password))

            # 驗證是否合法
            if CheckParameters(get_token, mdf_password) == False:
                return json.dumps("{'message': '傳入密碼不合法'}", ensure_ascii=False)

            # 先得到token對應用戶名
            select = RunSqlite("./database.db","SessionAuthDB","select","username",f"token='{get_token}'")
            if select != []:
                # 接著直接修改密碼即可
                modify_username = str(select[0][0])
                print("得到的用戶名: {}".format(modify_username))
                update = RunSqlite("database.db","UserAuthDB","update",f"username='{modify_username}'",f"password='{mdf_password}'")
                if update == True:
                    # 刪除原來的token,讓用戶重新獲取
                    delete = RunSqlite("./database.db","SessionAuthDB","delete",f"username='{modify_username}'","none")
                    print("刪除token狀態: {}".format(delete))
                    return json.dumps("{'message': '修改成功,請重新登錄獲取Token'}", ensure_ascii=False)

                else:
                    return json.dumps("{'message': '修改失敗'}", ensure_ascii=False)
            else:
                return json.dumps("{'message': '不存在該Token,無法修改密碼'}", ensure_ascii=False)
        else:
            return json.dumps("{'message': '傳入參數個數不正確'}", ensure_ascii=False)
    return json.dumps("{'message': '未知錯誤'}", ensure_ascii=False)

if __name__ == '__main__':
    app.run(debug=True)

首先需要在Web頁面訪問http://127.0.0.1/create路徑實現對資料庫的初始化,並打開Postman工具,通過傳入參數來使用這個案例。

文章出處:https://www.cnblogs.com/LyShark/p/17859045.html
本博客所有文章除特別聲明外,均採用 BY-NC-SA 許可協議。轉載請註明出處!
您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • Cameo項目介紹: 1、實時捕獲並顯示攝像頭幀。 2、具備截圖、保存視頻和退出三個功能鍵。 要求存在文件:manager.py 和 cameo.py 一、manager.py 兩個類:CaptureManager、WindowManager CaptureManager負責攝像頭幀的捕獲,編解碼得 ...
  • 題目 給你一個數組 nums 和一個值 val,你需要 原地 移除所有數值等於 val 的元素,並返回移除後數組的新長度。 不要使用額外的數組空間,你必須僅使用 O(1) 額外空間並 原地 修改輸入數組。 元素的順序可以改變。你不需要考慮數組中超出新長度後面的元素。 說明: 為什麼返回數值是整數,但 ...
  • Flask-SocketIO 是基於 Flask 的一個擴展,用於簡化在 Flask 應用中集成 WebSocket 功能。WebSocket 是一種在客戶端和伺服器之間實現實時雙向通信的協議,常用於實現實時性要求較高的應用,如聊天應用、實時通知等,使得開發者可以更輕鬆地構建實時性要求較高的應用。通... ...
  • keycloak可以幫助我們實現這個功能:用戶token每5分鐘失效一次,失效後通過refresh_token來換新的token,而refresh_token每30天失效一次,但如果用戶3天都沒有任何操作(就是沒有用refresh_token去換新的token),那麼3天後也讓refresh_tok ...
  • 1 簡介 Spring Data Redis是 Spring Data 系列的一部分,它提供了Spring應用程式對Redis的輕鬆配置和使用。它不僅提供了對Redis操作的高級抽象,還支持Jedis和Lettuce兩種連接方式。 可通過簡單的配置就能連接Redis,並且可以切換Jedis和Lett ...
  • PlayImage 記得一鍵三連哦 一個使用簡單的QPainter繪圖事件實現圖片播放器的簡易demo 支持圖片切換 支持多路更新,自己擴展即可 支持幻燈片播放 PlayImage自定義控制項支持復用,對外提供updateImage和updatePixmap介面,對傳入的image和pixmap進行圖 ...
  • Flask前後端數據動態交互涉及用戶界面與伺服器之間的靈活數據傳遞。用戶界面使用ECharts圖形庫實時渲染數據。它提供了豐富多彩、交互性強的圖表和地圖,能夠在網頁上直觀、生動地展示數據。ECharts支持各種常見的圖表類型,包括折線圖、柱狀圖、餅圖、散點圖等,同時還支持動畫效果、數據篩選、區域縮放... ...
  • Redis以其速度而聞名。 1 業務數據緩存 1.1 通用數據緩存 string,int,list,map。Redis 最常見的用例是緩存對象以加速 Web 應用程式。 此用例中,Redis 將頻繁請求的數據存儲在記憶體。允許 Web 伺服器快速返回頻繁訪問的數據。這減輕資料庫的負載並提高應用程式RT ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...