Python動態伺服器網頁(需要使用WSGI介面),基本實現步驟如下: 1.等待客戶端的鏈接,伺服器會收到一個http協議的請求數據報 2.利用正則表達式對這個請求數據報進行解析(請求方式、提取出文件的環境) 3.提取出文件的環境之後,利用截斷取片的方法將文件名轉化為模塊名稱 4.使用m = __i ...
Python動態伺服器網頁(需要使用WSGI介面),基本實現步驟如下:
1.等待客戶端的鏈接,伺服器會收到一個http協議的請求數據報
2.利用正則表達式對這個請求數據報進行解析(請求方式、提取出文件的環境)
3.提取出文件的環境之後,利用截斷取片的方法將文件名轉化為模塊名稱
4.使用m = __import__()
,就可以得到返回值為m的模塊
5.創建一個env字典:其中包含的是請求方式及文件環境等各種的鍵值對
6.創建一個新的動態腳本,其中定義了application這個函數,必須包含env和start_response的參數(也是伺服器里的調用方法)
7.在這個動態腳本中定義狀態碼status和響應頭headers(註意是字典形式,如Content-Type)
8.然後再調用start_response(status,headers),但是要註意,這個函數在伺服器被定義
9.在動態腳本中編寫動態執行程式
10.m.appliction的返回值就是回應數據包的body,它的數據頭在start_response被整合
11.將數據頭與數據body拼接起來,然後發送給客戶端,就可顯示動態網頁
MyWebServer
import socket
import re
import sys
from multiprocessing import Process
from MyWebFramework import Application
# 設置靜態文件根目錄
HTML_ROOT_DIR = "./html"
WSGI_PYTHON_DIR = "./wsgipython"
class HTTPServer(object):
""""""
def __init__(self, application):
"""構造函數, application指的是框架的app"""
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.app = application
def start(self):
self.server_socket.listen(128)
while True:
client_socket, client_address = self.server_socket.accept()
#print("[%s,%s]用戶連接上了" % (client_address[0],client_address[1]))
print("[%s, %s]用戶連接上了" % client_address)
handle_client_process = Process(target=self.handle_client, args=(client_socket,))
handle_client_process.start()
client_socket.close()
def start_response(self, status, headers):
"""
status = "200 OK"
headers = [
("Content-Type", "text/plain")
]
star
"""
response_headers = "HTTP/1.1 " + status + "\r\n"
for header in headers:
response_headers += "%s: %s\r\n" % header
self.response_headers = response_headers
def handle_client(self, client_socket):
"""處理客戶端請求"""
# 獲取客戶端請求數據
request_data = client_socket.recv(1024)
print("request data:", request_data)
request_lines = request_data.splitlines()
for line in request_lines:
print(line)
# 解析請求報文
# 'GET / HTTP/1.1'
request_start_line = request_lines[0]
# 提取用戶請求的文件名
print("*" * 10)
print(request_start_line.decode("utf-8"))
file_name = re.match(r"\w+ +(/[^ ]*) ", request_start_line.decode("utf-8")).group(1)
method = re.match(r"(\w+) +/[^ ]* ", request_start_line.decode("utf-8")).group(1)
env = {
"PATH_INFO": file_name,
"METHOD": method
}
response_body = self.app(env, self.start_response)
response = self.response_headers + "\r\n" + response_body
# 向客戶端返迴響應數據
client_socket.send(bytes(response, "utf-8"))
# 關閉客戶端連接
client_socket.close()
def bind(self, port):
self.server_socket.bind(("", port))
def main():
sys.path.insert(1, WSGI_PYTHON_DIR)
if len(sys.argv) < 2:
sys.exit("python MyWebServer.py Module:app")
# python MyWebServer.py MyWebFrameWork:app
module_name, app_name = sys.argv[1].split(":")
# module_name = "MyWebFrameWork"
# app_name = "app"
m = __import__(module_name)
app = getattr(m, app_name)
http_server = HTTPServer(app)
# http_server.set_port
http_server.bind(8000)
http_server.start()
if __name__ == "__main__":
main()
MyWebFrameWork
import time
# from MyWebServer import HTTPServer
# 設置靜態文件根目錄
HTML_ROOT_DIR = "./html"
class Application(object):
"""框架的核心部分,也就是框架的主題程式,框架是通用的"""
def __init__(self, urls):
# 設置路由信息
self.urls = urls
def __call__(self, env, start_response):
path = env.get("PATH_INFO", "/")
# /static/index.html
if path.startswith("/static"):
# 要訪問靜態文件
file_name = path[7:]
# 打開文件,讀取內容
try:
file = open(HTML_ROOT_DIR + file_name, "rb")
except IOError:
# 代表未找到路由信息,404錯誤
status = "404 Not Found"
headers = []
start_response(status, headers)
return "not found"
else:
file_data = file.read()
file.close()
status = "200 OK"
headers = []
start_response(status, headers)
return file_data.decode("utf-8")
for url, handler in self.urls:
#("/ctime", show_ctime)
if path == url:
return handler(env, start_response)
# 代表未找到路由信息,404錯誤
status = "404 Not Found"
headers = []
start_response(status, headers)
return "not found"
def show_ctime(env, start_response):
status = "200 OK"
headers = [
("Content-Type", "text/plain")
]
start_response(status, headers)
return time.ctime()
def say_hello(env, start_response):
status = "200 OK"
headers = [
("Content-Type", "text/plain")
]
start_response(status, headers)
return "hello itcast"
def say_haha(env, start_response):
status = "200 OK"
headers = [
("Content-Type", "text/plain")
]
start_response(status, headers)
return "hello haha"
urls = [
("/", show_ctime),
("/ctime", show_ctime),
("/sayhello", say_hello),
("/sayhaha", say_haha),
]
app = Application(urls)
# if __name__ == "__main__":
# urls = [
# ("/", show_ctime),
# ("/ctime", show_ctime),
# ("/sayhello", say_hello),
# ("/sayhaha", say_haha),
# ]
# app = Application(urls)
# http_server = HTTPServer(app)
# http_server.bind(8000)
# http_server.start()