操作系統為 Ubuntu 16.04,OpenCV 版本為opencv-python 3.4.1.15。 你可以在我的 Github 上找到 Windows 系統和 Linux 系統對應的源代碼,此教程對應的版本是 v0.2。目前我正在開發的版本是 v0.3,新版本將允許使用不同IP協議的主機通信, ...
操作系統為 Ubuntu 16.04
,OpenCV 版本為opencv-python 3.4.1.15
。
你可以在我的 Github 上找到 Windows 系統和 Linux 系統對應的源代碼,此教程對應的版本是 v0.2。目前我正在開發的版本是 v0.3,新版本將允許使用不同IP協議的主機通信,並且範圍不再局限於區域網內。這個工具最初是為了通過IPv6節省聊天工具使用的流量而開發的。
2. 內容簡介
- 本實驗實現簡易的視頻通信工具
- 在視頻通信的基礎上加入語音
- 用戶可以選擇通信的質量,即畫質、停頓等參數
- 支持IPv6
3. 實驗知識點
本課程項目完成過程中將學習:
- Python 基於 OpenCV 對攝像頭信息的捕獲和壓縮
- Python 關於 線程 和 socket 通信的一些基礎技巧
- Python 基於 PyAudio 對語音信息的捕獲和壓縮
其中將重點介紹 socket 傳輸過程中對數據的壓縮和處理。
4.實驗環境
- python 3.5
- opencv-python 3.4.1.15
- numpy 1.14.5
- PyAudio 0.2.11
二、環境搭建
通過以下命令可下載項目源碼,作為參照對比完成下麵詳細步驟的學習。
$ cd Code $ wget https://labfile.oss.aliyuncs.com/courses/672/ichat.zip $ unzip ichat.zip
現在開始下載環境依賴的包,確保在剛在解壓文件下的目錄里運行。
$ cd ichat
$ sudo pip3 install numpy
$ sudo pip3 install opencv_python
這一步下載了我們需要的opencv-python和numpy兩個包。
剩下的PyAudio,由於本虛擬環境的部分問題,我們單獨分開下載。
$ sudo apt-get install portaudio19-dev python-all-dev python3-all-dev
$ sudo pip3 install pyaudio==0.2.11
現在,我們的實驗環境就搭好了。
三、實驗原理
實驗實現了簡易的視頻通信工具,基於 OpenCV 和 PyAudio,使用 TCP 協議通信,通信雙方建立雙向 CS 連接,雙方均維護一個客戶端和一個伺服器端。在捕獲視頻信息後,根據用戶指定的參數對畫面做壓縮並傳輸。
四、實驗步驟
接下來我們分步驟講解本實驗。
4.1 實現雙向 C/S 連接
先為雙方的通信設計 Server 類和 Client類,兩個類均繼承 threading.Thread
,只需要分別實現 __init__
、__del__
和run
方法,之後對象調用.start()
方法即可在獨立線程中執行run
方法中的內容。首先Client
類需要存儲遠端的IP地址和埠,而Server
類需要存儲本地伺服器監聽的埠號。用戶還應當可以指定通信雙方使用的協議版本,即基於IPv4 還是IPv6 的TCP連接。因此Server
類的初始化需要傳入兩個參數(埠、版本),Client
類的初始化需要三個參數(遠端IP、埠、版本)。新建文件vchat.py
,在其中定義基礎的兩個類如下。
1 from socket import * 2 import threading 3 class Video_Server(threading.Thread): 4 def __init__(self, port, version) : 5 threading.Thread.__init__(self) 6 self.setDaemon(True) 7 self.ADDR = ('', port) 8 if version == 4: 9 self.sock = socket(AF_INET ,SOCK_STREAM) 10 else: 11 self.sock = socket(AF_INET6 ,SOCK_STREAM) 12 def __del__(self): 13 self.sock.close() 14 # TODO 15 def run(self): 16 print("server starts...") 17 self.sock.bind(self.ADDR) 18 self.sock.listen(1) 19 conn, addr = self.sock.accept() 20 print("remote client success connected...") 21 # TODO 22 23 class Video_Client(threading.Thread): 24 def __init__(self ,ip, port, version): 25 threading.Thread.__init__(self) 26 self.setDaemon(True) 27 self.ADDR = (ip, port) 28 if version == 4: 29 self.sock = socket(AF_INET, SOCK_STREAM) 30 else: 31 self.sock = socket(AF_INET6, SOCK_STREAM) 32 def __del__(self) : 33 self.sock.close() 34 # TODO 35 def run(self): 36 print("client starts...") 37 while True: 38 try: 39 self.sock.connect(self.ADDR) 40 break 41 except: 42 time.sleep(3) 43 continue 44 print("client connected...") 45 # TODO
4.2 實現攝像頭數據流捕獲
OpenCV 為 Python 提供的介面非常簡單並且易於理解。捕獲視頻流的任務應當由Client
類完成,下麵完善Client
的run
函數。在下麵的代碼中,我們為類添加了一個成員變數cap
,它用來捕獲預設攝像頭的輸出。
1 class Video_Client(threading.Thread): 2 def __init__(self ,ip, port, version): 3 threading.Thread.__init__(self) 4 self.setDaemon(True) 5 self.ADDR = (ip, port) 6 if version == 4: 7 self.sock = socket(AF_INET, SOCK_STREAM) 8 else: 9 self.sock = socket(AF_INET6, SOCK_STREAM) 10 self.cap = cv2.VideoCapture(0) 11 def __del__(self) : 12 self.sock.close() 13 self.cap.release() 14 def run(self): 15 print("client starts...") 16 while True: 17 try: 18 self.sock.connect(self.ADDR) 19 break 20 except: 21 time.sleep(3) 22 continue 23 print("client connected...") 24 while self.cap.isOpened(): 25 ret, frame = self.cap.read() 26 # TODO
4.3 發送捕獲到的數據到伺服器
已經捕獲到數據,接下來要發送位元組流。首先我們繼續編寫Client
,為其添加發送數據功能的實現。這裡只改動了run
方法。在捕獲到幀後,我們使用pickle.dumps
方法對其打包,並用sock.sendall
方法發送。註意發送過程中我們用struct.pack
方法為每批數據加了一個頭,用於接收方確認接受數據的長度。
1 def run(self): 2 while True: 3 try: 4 self.sock.connect(self.ADDR) 5 break 6 except: 7 time.sleep(3) 8 continue 9 print("client connected...") 10 while self.cap.isOpened(): 11 ret, frame = self.cap.read() 12 data = pickle.dumps(frame) 13 try: 14 self.sock.sendall(struct.pack("L", len(data)) + data) 15 except: 16 break
下麵編寫Server
,在伺服器端連接成功後,應當創建一個視窗用於顯示接收到的視頻。因為連接不一定創建成功,因此cv.destroyAllWindows()
被放在一個try..catch
塊中防止出現錯誤。在接收數據過程中,我們使用payload_size
記錄當前從緩衝區讀入的數據長度,這個長度通過struct.calcsize('L')
來讀取。使用該變數的意義在於緩衝區中讀出的數據可能不足一個幀,也可能由多個幀構成。為了準確提取每一幀,我們用payload_size
區分幀的邊界。在從緩衝區讀出的數據流長度超過payload_size
時,剩餘部分和下一次讀出的數據流合併,不足payload_size
時將合併下一次讀取的數據流到當前幀中。在接收完完整的一幀後,顯示在創建的視窗中。同時我們為視窗創建一個鍵盤響應,當按下Esc
或 q
鍵時退出程式。
class Video_Server(threading.Thread): def __init__(self, port, version) : threading.Thread.__init__(self) self.setDaemon(True) self.ADDR = ('', port) if version == 4: self.sock = socket(AF_INET ,SOCK_STREAM) else: self.sock = socket(AF_INET6 ,SOCK_STREAM) def __del__(self): self.sock.close() try: cv2.destroyAllWindows() except: pass def run(self): print("server starts...") self.sock.bind(self.ADDR) self.sock.listen(1) conn, addr = self.sock.accept() print("remote client success connected...") data = "".encode("utf-8") payload_size = struct.calcsize("L") cv2.namedWindow('Remote', cv2.WINDOW_NORMAL) while True: while len(data) < payload_size: data += conn.recv(81920) packed_size = data[:payload_size] data = data[payload_size:] msg_size = struct.unpack("L", packed_size)[0] while len(data) < msg_size: data += conn.recv(81920) zframe_data = data[:msg_size] data = data[msg_size:] frame_data = zlib.decompress(zframe_data) frame = pickle.loads(frame_data) cv2.imshow('Remote', frame) if cv2.waitKey(1) & 0xFF == 27: break
4.4 視頻縮放和數據壓縮
現在的伺服器和客戶端已經可以運行,你可以在代碼中創建一個Client
類實例和一個Server
類實例,並將IP地址設為127.0.0.1
,埠設為任意合法的(0-65535)且不衝突的值,版本設為IPv4。執行代碼等同於自己和自己通信。如果網路狀況不好,你也許會發現自己和自己的通信也有卡頓現象。為了使畫面質量、延遲能夠和現實網路狀況相匹配,我們需要允許用戶指定通信中畫面的質量,同時我們的代碼應當本身具有壓縮數據的能力,以儘可能利用帶寬。
當用戶指定使用低畫質通信,我們應當對原始數據做變換,最簡單的方式即將捕獲的每一幀按比例縮放,同時降低傳輸的幀速,在代碼中體現為resize
,該函數的第二個參數為縮放中心,後兩個參數為縮放比例,並且根據用戶指定的等級,不再傳輸捕獲的每一幀,而是間隔幾幀傳輸一幀。為了防止用戶指定的畫質過差,代碼中限制了最壞情況下的縮放比例為0.3,最大幀間隔為3。此外,我們在發送每一幀的數據前使用zlib.compress
對其壓縮,儘量降低帶寬負擔。
1 class Video_Client(threading.Thread): 2 def __init__(self ,ip, port, level, version): 3 threading.Thread.__init__(self) 4 self.setDaemon(True) 5 self.ADDR = (ip, port) 6 if level <= 3: 7 self.interval = level 8 else: 9 self.interval = 3 10 self.fx = 1 / (self.interval + 1) 11 if self.fx < 0.3: 12 self.fx = 0.3 13 if version == 4: 14 self.sock = socket(AF_INET, SOCK_STREAM) 15 else: 16 self.sock = socket(AF_INET6, SOCK_STREAM) 17 self.cap = cv2.VideoCapture(0) 18 def __del__(self) : 19 self.sock.close() 20 self.cap.release() 21 def run(self): 22 print("VEDIO client starts...") 23 while True: 24 try: 25 self.sock.connect(self.ADDR) 26 break 27 except: 28 time.sleep(3) 29 continue 30 print("VEDIO client connected...") 31 while self.cap.isOpened(): 32 ret, frame = self.cap.read() 33 sframe = cv2.resize(frame, (0,0), fx=self.fx, fy=self.fx) 34 data = pickle.dumps(sframe) 35 zdata = zlib.compress(data, zlib.Z_BEST_COMPRESSION) 36 try: 37 self.sock.sendall(struct.pack("L", len(zdata)) + zdata) 38 except: 39 break 40 for i in range(self.interval): 41 self.cap.read()
伺服器端最終代碼如下,增加了對接收到數據的解壓縮處理。
1 class Video_Server(threading.Thread): 2 def __init__(self, port, version) : 3 threading.Thread.__init__(self) 4 self.setDaemon(True) 5 self.ADDR = ('', port) 6 if version == 4: 7 self.sock = socket(AF_INET ,SOCK_STREAM) 8 else: 9 self.sock = socket(AF_INET6 ,SOCK_STREAM) 10 def __del__(self): 11 self.sock.close() 12 try: 13 cv2.destroyAllWindows() 14 except: 15 pass 16 def run(self): 17 print("VEDIO server starts...") 18 self.sock.bind(self.ADDR) 19 self.sock.listen(1) 20 conn, addr = self.sock.accept() 21 print("remote VEDIO client success connected...") 22 data = "".encode("utf-8") 23 payload_size = struct.calcsize("L") 24 cv2.namedWindow('Remote', cv2.WINDOW_NORMAL) 25 while True: 26 while len(data) < payload_size: 27 data += conn.recv(81920) 28 packed_size = data[:payload_size] 29 data = data[payload_size:] 30 msg_size = struct.unpack("L", packed_size)[0] 31 while len(data) < msg_size: 32 data += conn.recv(81920) 33 zframe_data = data[:msg_size] 34 data = data[msg_size:] 35 frame_data = zlib.decompress(zframe_data) 36 frame = pickle.loads(frame_data) 37 cv2.imshow('Remote', frame) 38 if cv2.waitKey(1) & 0xFF == 27: 39 break
4.5 加入音頻的捕獲和傳輸
在完成視頻通信的基礎上,整體框架對於音頻通信可以直接挪用,只需要修改其中捕獲視頻/音頻的代碼和伺服器解碼播放的部分。這裡我們使用 PyAudio 庫處理音頻,在 Linux 下你也可以選擇 sounddevice
。關於sounddevice
這裡不做過多介紹,將vchat.py
複製一份,重命名為achat.py
,簡單修改幾處,最終音頻捕獲、傳輸的完整代碼如下。我將上面代碼中的Server
和Client
分別加上Video
和Audio
首碼以區分,同時顯示給用戶的print
輸出語句也做了一定修改,對於視頻加上VIDEO
首碼,音頻加上AUDIO
首碼。如果你對代碼中使用到的 PyAudio 提供的庫函數有所疑問,
1 class Audio_Server(threading.Thread): 2 def __init__(self, port, version) : 3 threading.Thread.__init__(self) 4 self.setDaemon(True) 5 self.ADDR = ('', port) 6 if version == 4: 7 self.sock = socket(AF_INET ,SOCK_STREAM) 8 else: 9 self.sock = socket(AF_INET6 ,SOCK_STREAM) 10 self.p = pyaudio.PyAudio() 11 self.stream = None 12 def __del__(self): 13 self.sock.close() 14 if self.stream is not None: 15 self.stream.stop_stream() 16 self.stream.close() 17 self.p.terminate() 18 def run(self): 19 print("AUDIO server starts...") 20 self.sock.bind(self.ADDR) 21 self.sock.listen(1) 22 conn, addr = self.sock.accept() 23 print("remote AUDIO client success connected...") 24 data = "".encode("utf-8") 25 payload_size = struct.calcsize("L") 26 self.stream = self.p.open(format=FORMAT, 27 channels=CHANNELS, 28 rate=RATE, 29 output=True, 30 frames_per_buffer = CHUNK 31 ) 32 while True: 33 while len(data) < payload_size: 34 data += conn.recv(81920) 35 packed_size = data[:payload_size] 36 data = data[payload_size:] 37 msg_size = struct.unpack("L", packed_size)[0] 38 while len(data) < msg_size: 39 data += conn.recv(81920) 40 frame_data = data[:msg_size] 41 data = data[msg_size:] 42 frames = pickle.loads(frame_data) 43 for frame in frames: 44 self.stream.write(frame, CHUNK) 45 46 class Audio_Client(threading.Thread): 47 def __init__(self ,ip, port, version): 48 threading.Thread.__init__(self) 49 self.setDaemon(True) 50 self.ADDR = (ip, port) 51 if version == 4: 52 self.sock = socket(AF_INET, SOCK_STREAM) 53 else: 54 self.sock = socket(AF_INET6, SOCK_STREAM) 55 self.p = pyaudio.PyAudio() 56 self.stream = None 57 def __del__(self) : 58 self.sock.close() 59 if self.stream is not None: 60 self.stream.stop_stream() 61 self.stream.close() 62 self.p.terminate() 63 def run(self): 64 print("AUDIO client starts...") 65 while True: 66 try: 67 self.sock.connect(self.ADDR) 68 break 69 except: 70 time.sleep(3) 71 continue 72 print("AUDIO client connected...") 73 self.stream = self.p.open(format=FORMAT, 74 channels=CHANNELS, 75 rate=RATE, 76 input=True, 77 frames_per_buffer=CHUNK) 78 while self.stream.is_active(): 79 frames = [] 80 for i in range(0, int(RATE / CHUNK * RECORD_SECONDS)): 81 data = self.stream.read(CHUNK) 82 frames.append(data) 83 senddata = pickle.dumps(frames) 84 try: 85 self.sock.sendall(struct.pack("L", len(senddata)) + senddata) 86 except: 87 break
至此我們完成了 vchat.py 的編寫。
4.6 編寫程式入口 main.py
為了提供用戶參數解析,代碼使用了argparse
。你可能對此前幾個類中初始化方法的self.setDaemon(True)
有疑惑。這個方法的調用使每個線程在主線程結束之後自動退出,保證程式不會出現崩潰且無法銷毀的情況。在main.py
中,我們通過每隔1s做一次線程的保活檢查,如果視頻/音頻中出現阻塞/故障,主線程會終止。
1 import sys 2 import time 3 import argparse 4 from vchat import Video_Server, Video_Client 5 from achat import Audio_Server, Audio_Client 6 7 parser = argparse.ArgumentParser() 8 9 parser.add_argument('--host', type=str, default='127.0.0.1') 10 parser.add_argument('--port', type=int, default=10087) 11 parser.add_argument('--level', type=int, default=1) 12 parser.add_argument('-v', '--version', type=int, default=4) 13 14 args = parser.parse_args() 15 16 IP = args.host 17 PORT = args.port 18 VERSION = args.version 19 LEVEL = args.level 20 21 if __name__ == '__main__': 22 vclient = Video_Client(IP, PORT, LEVEL, VERSION) 23 vserver = Video_Server(PORT, VERSION) 24 aclient = Audio_Client(IP, PORT+1, VERSION) 25 aserver = Audio_Server(PORT+1, VERSION) 26 vclient.start() 27 aclient.start() 28 time.sleep(1) # make delay to start server 29 vserver.start() 30 aserver.start() 31 while True: 32 time.sleep(1) 33 if not vserver.isAlive() or not vclient.isAlive(): 34 print("Video connection lost...") 35 sys.exit(0) 36 if not aserver.isAlive() or not aclient.isAlive(): 37 print("Audio connection lost...") 38 sys.exit(0)
4.7 運行情況
因為實驗樓的環境沒有提供攝像頭,因此我們需要修改一下代碼,讓程式從一個本地視頻文件讀取,模擬攝像頭的訪問。將Video_Client
中self.cap = cv2.VideoCapture(0)
改為self.cap = cv2.VideoCapture('test.mp4')
,即從本地視頻test.mp4
中讀取。在修改完你的代碼後,你可以通過以下命令下載test.mp4
(該視頻文件是周傑倫《浪漫手機》的MV),並檢驗代碼。(請確保在ichat文件夾下!)
$ wget http://labfile.oss.aliyuncs.com/courses/671/test.mp4
$ python3 main.py
和上面命令一樣,在本機可以通過 python3 main.py
來實驗本機和本機的視頻聊天,如果你有條件在同一區域網內的兩台機器上實驗,則可以將程式部署在兩台機器上,並相互連接觀察效果。下麵兩張圖為本機上實驗截圖,有些情況下 PyAudio 可能會提示一些警告,你可以忽視它的提示。用戶也可以指定level
參數,level
越高,畫質越差,level
為 0 為原始畫面,在我們的main.py
中預設level
為 1。
通過在某高校校園網內驗證,程式可以保證長時間順暢通話,偶爾會出現網路質量較差導致的短暫卡頓,不影響實際視頻通話效果。
轉載來自https://hzl-fj.com/2248.html