併發伺服器幾種實現方法總結

来源:https://www.cnblogs.com/NolaLi/archive/2017/12/28/8137973.html
-Advertisement-
Play Games

今天主題是實現併發伺服器,實現方法有多種版本,先從簡單的單進程代碼實現到多進程,多線程的實現,最終引入一些高級模塊來實現併發TCP伺服器。 說到TCP,想起吐槽大會有個段子提到三次握手,也只有程式猿(媛)能get。 UDP伺服器數據傳輸不可靠,這裡就忽略了。 >>: 簡單的單進程TCP伺服器 假代碼 ...


今天主題是實現併發伺服器,實現方法有多種版本,先從簡單的單進程代碼實現到多進程,多線程的實現,最終引入一些高級模塊來實現併發TCP伺服器。

說到TCP,想起吐槽大會有個段子提到三次握手,也只有程式猿(媛)能get。

UDP伺服器數據傳輸不可靠,這裡就忽略了。

>>:

簡單的單進程TCP伺服器

假代碼:

#創建tcp伺服器套接字

#綁定埠

#設置正常情況退出的伺服器下,埠可以重用

#設置監聽,變為主動監聽

# 等待客戶端的鏈接,返回新的socket和地址

#關閉tcp伺服器套接字

from socket import socket, AF_INET,SOCK_STREAM,SOL_SOCKET,SO_REUSEADDR

#創建tcp伺服器套接字
server_socket = socket(AF_INET,SOCK_STREAM)
#綁定埠
server_socket.bind(("",9999))
#設置正常情況退出的伺服器下,埠可以重用
server_socket.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
#設置監聽,變為主動監聽
server_socket.listen(5)

while True:
   # 等待客戶端的鏈接,返回新的socket和地址
   new_socket,new_address = server_socket.accept()
   #接收數據,並且發送數據
   try:
      while True:
         recv_data = new_socket.recv(1024)
         #當有客戶端關閉後,recv解除阻塞,並且返回長度為0
         if len(recv_data) > 0:
            recv_content = recv_data.decode("gb2312")
            print("收到:%s的信息是:%s" % (str(new_address),recv_content))
            new_socket.send("thank you!".encode("gb2312"))
         else:
            print("客戶端%s已經關閉" % (str(new_address)))
            break
   finally:
      new_socket.close()
      print("關閉%s客戶端" % (str(new_address)))

#關閉tcp伺服器套接字
server_socket.close()

 

多進程TCP伺服器

from socket import socket, AF_INET,SOCK_STREAM,SOL_SOCKET,SO_REUSEADDR
from multiprocessing import Process

#在子進程中接收消息
def recv_data(new_socket,new_address):
   while True:
      recv_data = new_socket.recv(1024)
      # 當有客戶端關閉後,recv解除阻塞,並且返回長度為0
      if len(recv_data) > 0:
         recv_content = recv_data.decode("gb2312")
         print("收到:%s的信息是:%s" % (str(new_address), recv_content))
         new_socket.send("thank you!".encode("gb2312"))
      else:
         print("客戶端%s已經關閉" % (str(new_address)))
         break
   #關閉與客戶端的連接
   print("關閉與客戶端的連接")
   new_socket.close()

def main():
   #創建tcp伺服器套接字
   server_socket = socket(AF_INET,SOCK_STREAM)
   #綁定埠
   server_socket.bind(("",8888))
   #設置正常情況退出的伺服器下,埠可以重用
   server_socket.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)

   #設置監聽,變為被動連接
   server_socket.listen(3)

   try:
      while True:
         # 等待客戶端的鏈接,返回新的socket和地址
         new_socket,new_address = server_socket.accept()
         #接收數據,並且發送數據
         Process(target=recv_data,args=(new_socket,new_address)).start()
         #因為主進程和子進程不共用數據
         #如果我們直接關閉new_socket,只是關閉主進程的new_socket,而子進程的不受影響
         new_socket.close()
   finally:
      #關閉tcp伺服器套接字
      server_socket.close()

if __name__ == "__main__":
   main()

 

 多進程TCP伺服器

from socket import socket, AF_INET,SOCK_STREAM,SOL_SOCKET,SO_REUSEADDR
from multiprocessing import Process

#在子進程中接收消息
def recv_data(new_socket,new_address):
   while True:
      recv_data = new_socket.recv(1024)
      # 當有客戶端關閉後,recv解除阻塞,並且返回長度為0
      if len(recv_data) > 0:
         recv_content = recv_data.decode("gb2312")
         print("收到:%s的信息是:%s" % (str(new_address), recv_content))
         new_socket.send("thank you!".encode("gb2312"))
      else:
         print("客戶端%s已經關閉" % (str(new_address)))
         break
   #關閉與客戶端的連接
   print("關閉與客戶端的連接")
   new_socket.close()

def main():
   #創建tcp伺服器套接字
   server_socket = socket(AF_INET,SOCK_STREAM)
   #綁定埠
   server_socket.bind(("",8888))
   #設置正常情況退出的伺服器下,埠可以重用
   server_socket.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)

   #設置監聽,變為被動連接
   server_socket.listen(3)

   try:
      while True:
         # 等待客戶端的鏈接,返回新的socket和地址
         new_socket,new_address = server_socket.accept()
         #接收數據,並且發送數據
         Process(target=recv_data,args=(new_socket,new_address)).start()
         #因為主進程和子進程不共用數據
         #如果我們直接關閉new_socket,只是關閉主進程的new_socket,而子進程的不受影響
         new_socket.close()
   finally:
      #關閉tcp伺服器套接字
      server_socket.close()

if __name__ == "__main__":
   main()

 

 多線程TCP伺服器

from socket import socket, AF_INET,SOCK_STREAM,SOL_SOCKET,SO_REUSEADDR
from threading import Thread

#接收消息
def recv_data(new_socket,new_address):
   while True:
      recv_data = new_socket.recv(1024)
      # 當有客戶端關閉後,recv解除阻塞,並且返回長度為0
      if len(recv_data) > 0:
         recv_content = recv_data.decode("gb2312")
         print("收到:%s的信息是:%s" % (str(new_address), recv_content))
         new_socket.send("thank you!".encode("gb2312"))
      else:
         print("客戶端%s已經關閉" % (str(new_address)))
         break

def main():
   #創建tcp伺服器套接字
   server_socket = socket(AF_INET,SOCK_STREAM)
   #綁定埠
   server_socket.bind(("",9999))
   #設置正常情況退出的伺服器下,埠可以重用
   server_socket.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)

   #設置監聽,變為被動連接
   server_socket.listen(3)

   try:
      while True:
         # 等待客戶端的鏈接,返回新的socket和地址
         new_socket,new_address = server_socket.accept()
         #接收數據,並且發送數據
         Thread(target=recv_data,args=(new_socket,new_address)).start()
   finally:
      #關閉tcp伺服器套接字
      server_socket.close()

if __name__ == "__main__":
   main()

 

多任務協程實現 ——

greenlet和gevent 

 

#coding=utf-8
from greenlet import greenlet
import time
def test1():
    while True:
        print "---A--"
        gr2.switch()
        time.sleep(0.5)
def test2():
    while True:
        print "---B--"
        gr1.switch()
        time.sleep(0.5)
gr1 = greenlet(test1)
gr2 = greenlet(test2)
#切換到gr1中運行
gr1.switch()

-----------------------------------------------

import gevent

#函數
def f(n):
   for i in range(n):
      print("%s:%s" % (gevent.getcurrent(),i))



f1 = gevent.spawn(f,5)
f2 = gevent.spawn(f,5)
f3 = gevent.spawn(f,5)

#讓主線程等待三個協程執行完畢,否則沒有機會執行
f1.join()
f2.join()
f3.join()

#可以看到,3個greenlet是依次運行而不是交替運行。要讓greenlet交替運行,可以通過gevent.sleep()交出控制權。

 --------------------------------------------------

#coding=utf-8
import gevent
def f(n):
    for i in range(n):
        print gevent.getcurrent(), i
        #用來模擬一個耗時操作,註意不是time模塊中的sleep
        gevent.sleep(1)

g1 = gevent.spawn(f, 5)
g2 = gevent.spawn(f, 5)
g3 = gevent.spawn(f, 5)
#下麵三行代碼意思:主線程等待各個協成支持完,否則協成沒有機會執行
g1.join()
g2.join()
g3.join()

 

單進程TCP伺服器 ——

非堵塞式

 

from socket import AF_INET,socket,SO_REUSEADDR,SOCK_STREAM,SOL_SOCKET

def main():

   #創建tcp的socket套接字
   server_socket = socket(AF_INET,SOCK_STREAM)
   server_socket.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)

   #綁定埠
   server_socket.bind(("",9999))

   #設置非阻塞,也就是說accept方法不阻塞了,
   # 但是在沒有客戶端鏈接且被執行的時候會報錯
   #有客戶端鏈接的時候正常執行
   server_socket.setblocking(False)

   #設置監聽
   server_socket.listen(5)

   #客戶端列表
   client_lists = []
   try:

      #不斷調用accept
      while True:
         try:
            # print("accept--111")
            new_socket,new_address = server_socket.accept()
            print("accept--2222")
         except Exception as result:
            # print(result)
            pass
         else:
            print("新的客戶%s鏈接上" % str(new_address))
            #新鏈接的new_sokect預設也是阻塞,也設置為非阻塞後,recv為非阻塞
            new_socket.setblocking(False)
            client_lists.append((new_socket,new_address))
         # print(111)
         for client_sokect,client_address in client_lists:
            #接收數據
            try:
               recv_data = client_sokect.recv(1024)
            except Exception as result:
               # print(result)
               pass
            else:
               # print("正常數據:%s" %recv_data)
               if len(recv_data) > 0 :
                  print("收到%s:%s" % (str(client_address),recv_data))
                  client_sokect.send("thank you!".encode("gb2312"))
               else:
                  #客戶端已經埠,要把該客戶端從列表中異常
                  client_lists.remove((client_sokect,new_address))
                  client_sokect.close()
                  print("%s已經斷開" % str(new_address))

   finally:
      #關閉套接字
      server_socket.close()

if __name__ == "__main__":
   main()

 

單進程TCP伺服器 ——

select版

 

select 原理

其他語言(c或者c++)也有使用select實現多任務伺服器。

select 能夠完成一些套接字的檢查,從頭到尾檢查一遍後,標記哪些套接字是否可以收數據,返回的時候,就返回能接收數據的套接字,返回的是列表。select是由操作系統提供的,效率要高些,非常快的方式檢測哪些套接字可以接收數據。select是跨平臺的,在window也可以用。

io多路復用:沒有使用多進程和多線程的情況下完成多個套接字的使用

from socket import AF_INET,socket,SO_REUSEADDR,SOCK_STREAM,SOL_SOCKET
from select import select
import sys

def main():

   #創建tcp的socket套接字
   server_socket = socket(AF_INET,SOCK_STREAM)
   server_socket.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)

   #綁定埠
   server_socket.bind(("",9999))

   #設置監聽
   server_socket.listen(5)

   #客戶端列表
   socket_lists = [server_socket,sys.stdin]
   wirte_list = []
   #是否退出
   is_run = False
   try:

      while True:
         #檢測列表client_lists那些socket可以接收數據,
         #檢測列表[]那些套接字(socket)可否發送數據
         #檢測列表[]那些套接字(socket)是否產生了異常
         print("select--111")
         #這個select函數預設是堵塞,當有客戶端鏈接的時候解除阻塞,
         # 當有數據可以接收的時候解除阻塞,當客戶端斷開的時候解除阻塞
         readable, wirteable,excep = select(socket_lists,wirte_list,[])
         # print("select--2222")
         # print(111)
         for sock in wirteable:
            #這個會一直發送,因為他是處於已經發的狀態
            sock.send("thank you!".encode("gb2312"))
         for sock in readable:
            #接收數據
            if sock == server_socket:
               print("sock == server_socket")
               #有新的客戶端鏈接進來
               new_socket,new_address = sock.accept()
               #新的socket添加到列表中,便於下次socket的時候能檢查到
               socket_lists.append(new_socket)
            elif sock == sys.stdin:
               cmd = sys.stdin.readline()
               print(cmd)
               is_run = cmd
            else:
               # print("sock.recv(1024)....")
               #此時的套接字sock是直接可以取數據的
               recv_data = sock.recv(1024)
               if len(recv_data) > 0:
                  print("從[%s]:%s" % (str(new_address),recv_data))
                  sock.send(recv_data)
                  #把鏈接上有消息接收的socket添加到監聽寫的列表中
                  wirte_list.append(sock)
               else:
                  print("客戶端已經斷開")
                  #客戶端已經斷開,要移除
                  sock.close()
                  socket_lists.remove(sock)

         #是否退出程式
         if is_run:
            break

   finally:
      #關閉套接字
      server_socket.close()


if __name__ == "__main__":
   main()

 

單進程TCP伺服器 ——

epoll版

 

from socket import *
import select

def main():

   #創建tcp伺服器套接字
   server_socket = socket(AF_INET,SOCK_STREAM)
   #設置埠可以重用
   server_socket.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
   #綁定埠
   server_socket.bind(("",9999))
   #設置監聽
   server_socket.listen(5)

   #用epoll設置監聽收數據
   epoll = select.epoll()
   #把server_socket註冊到epoll的事件監聽中,如果已經註冊過會發生異常
   epoll.register(server_socket.fileno(),select.EPOLLIN|select.EPOLLET)
   #裝socket列表
   socket_lists = {}
   #裝socket對應的地址
   socket_address = {}
   while True:
      #返回套接字列表[(socket的文件描述符,select.EPOLLIN)],
      # 如果有新的鏈接,有數據發過來,斷開鏈接等都會解除阻塞
      print("epoll.poll--111")
      epoll_list = epoll.poll()
      print("epoll.poll--222")
      print(epoll_list)
      for fd,event in epoll_list:
         #有新的鏈接
         if fd == server_socket.fileno():
            print("新的客戶fd==%s" % fd)
            new_sokect,new_address = server_socket.accept()
            #往字典添加數據
            socket_lists[new_sokect.fileno()] = new_sokect
            socket_address[new_sokect.fileno()] = new_address
            #註冊新的socket也註冊到epoll的事件監聽中
            epoll.register(new_sokect.fileno(), select.EPOLLIN | select.EPOLLET)
         elif event ==select.EPOLLIN:
            print("收到數據了")
            #根據文件操作符取出對應socket
            new_sokect = socket_lists[fd]
            address = socket_address[fd]
            recv_data = new_sokect.recv(1024)
            if len(recv_data) > 0:
               print("已經收到[%s]:%s" % (str(address),recv_data.decode("gb2312")))
            else:
               #客戶端埠,取消監聽
               epoll.unregister(fd)
               #關閉鏈接
               new_sokect.close()
               print("[%s]已經下線" % str(address))



   #關閉套接字鏈接
   server_socket.close()

if __name__ == "__main__":
   main()

 

單進程TCP伺服器 ——

gevent版

 

gevent原理

greenlet已經實現了協程,但是這個還得人工切換,是不是覺得太麻煩了,莫要捉急,python還有一個比greenlet更強大的並且能夠自動切換任務的模塊gevent 

原理------當一個greenlet遇到IO(指的是input output 輸入輸出,比如網路、文件操作等)操作時,比如訪問網路,就自動切換到其他的greenlet,等到IO操作完成,再在適當的時候切換回來繼續執行。

由於IO操作非常耗時,經常使程式處於等待狀態,有了gevent為我們自動切換協程,就保證總有greenlet在運行,而不是等待IO.

import sys
import time
import gevent
from gevent import socket,monkey
monkey.patch_all()
def handle_request(conn):
    while True:
        data = conn.recv(1024)
        if not data:
            conn.close()
            break
        print("recv:", data)
        conn.send(data)

def server(port):
    s = socket.socket()
    s.bind(('', port))
    s.listen(5)
    while True:
        newSocket, addr = s.accept()
        gevent.spawn(handle_request, newSocket)
if __name__ == '__main__':
    server(7788)

 

首先基於以上代碼模塊,撒點概念問題:

1.什麼是協程?

協程:存線上程中,是比線程更小的執行單元,又稱微線程,纖程。自帶cpu上下文,操作協程由程式員決定,它可以將一個線程分解為多個微線程,每個協程間共用全局空間的變數,每秒鐘切換頻率高達百萬次。

2. 什麼是計算密集型IO密集型

計算密集型:要進行大量的計算,消耗cpu資源。如複雜計算,對視頻進行高清解碼等,全靠cpu的運算能力。而計算密集型任務完成多任務切換任務比較耗時,cpu執行任務效率就越低。在python中,多進程適合計算密集型任務。

IO密集型:涉及到網路、磁碟io的任務都是io密集型。cpu消耗少,計算量小,如請求網頁,讀寫文件等。在python中,使用sleep達到IO密集型任務的目的,多線程適合IO密集型任務。

 

各大實現版本對比:

select:

1)支持跨平臺,最大缺陷是單個進程打開的FD是有限的,由FD_SETSIZE設置,預設是1024;

2)socket掃描時是線性掃描,及採用輪詢方式,效率低;

3)需要維護一個存放大量FD的數據結構,使得用戶空間和內核空間在傳遞該數據結構時複製開銷大。

poll: 

1)poll與select本質上沒有區別,但poll沒有最大連接數的限制;

2)大量的fd數組被整體複製於用戶態和內核地址空間之間,不管這樣的複製是不是有意義;

3)‘水平觸發’,如果報告了fd後,沒有被處理,下次poll時還會再次報告該fd。

epoll:

1)是之前poll和select的增強版,epoll更靈活,沒有描述符限制,能打開的fd遠大於1024(1G的記憶體上能監聽約10萬個埠);

2)‘邊緣出發’,事件通知機制,效率提升,最大的特點在於它只管你活躍的連接,而跟連接總數無關。而epoll對文件描述符的操作模式之一ET是一種高效的工作方式,很大程度減少事件反覆觸發的次數,內核不會發送更多的通知(only once)。


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • Django框架基礎 這是我學習北京理工大學嵩天老師的《Python雲端系統開發入門》課程的筆記,在此我特別感謝老師的精彩講解和對我的引導。 1、Django簡介與安裝 Django是一個免費、開源的Web應用框架,由Python寫成。採用了MTV(Model-Template-View)的框架模式 ...
  • #裡面內容沒有見過,可能會比較難懂,需要找資料。我只是記錄了視頻中的用法,其他理解的東西,我直接理解,就沒有寫下來了。下麵內容是視頻演示過程 import hashlibm = hashlib.md5()print(m) # 只是一個加密對象m.update('aiq'.encode('utf-8' ...
  • 為了和python解釋器交互,控制台執行腳本後面添加變數import sysprint(sys.argv) def post(): print('upload')def download(): print('download')if sys.argv[1] == 'post': post()elif ...
  • 關於《Head First Python》一書中print_lol()函數的思考 在《Head First Python》第一章中,講述到Python處理複雜數據(以電影數據列表為例),首先將電影數據創建為Python列表,由於Python的變數標識符沒有類型,列表中的每一個數據項可以是任何類型的數 ...
  • Struts的簡單搭建(入門) 過程摘要:(struts2下載:https://struts.apache.org/) (軟體要求:安裝好eclipse/myeclipse和tomcat) 具體流程: 創建一個新的project,選擇動態web工程: (註意:若此時出現.jsp頁面找不到java b ...
  • 本文算是副產品,正品是利用FFmpeg從任意視頻中生成GIF片段的小程式,寫完了就發。 因為要對視頻畫面進行框選,再生成GIF,所以得有個框選的控制項,可Delphi里沒有啊,只好自己寫一個了。 聲明 本文參考的是盒子網的 "RectTracker" ,原作者署名xwwaw,發佈於2007年5月28日 ...
  • 演示不使用事務出現異常情況 Dao層兩個方法lessMoney()和moreMoney() Service層調用兩個方法 但是兩個操作減與加之間,如果出現異常,則會導致轉賬錢已經轉了,但對方卻沒有到賬的bug,可能伺服器突然故障等引起 解決添加事務,出現異常進行回滾操作 下麵使用配置文件的方法進行事 ...
  • solr 除了能查詢文檔中的數據外, 還可以導入資料庫中的數據. 也就是說, solr 能查詢其他資料庫中的數據(solr本身也是一個資料庫, 非關係型的). 那接下來就試一下導入mysql資料庫中的數據. 一. 準備工作 1. 在solr_core下麵新建lib文件夾. 然後將以下jar包拷貝進去 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...