本文主要介紹了在使用Python面向對象編程時,如何實現組合關係,同時對比了組合關係和繼承關係的優缺點,並講解瞭如何通過csv模塊來保存Python接收/生成的數據。 ...
全網最適合入門的面向對象編程教程:20 類和對象的 Python 實現-組合關係的實現與 CSV 文件保存
摘要:
本文主要介紹了在使用 Python 面向對象編程時,如何實現組合關係,同時對比了組合關係和繼承關係的優缺點,並講解瞭如何通過 csv 模塊來保存 Python 接收/生成的數據。
原文鏈接:
往期推薦:
全網最適合入門的面向對象編程教程:00 面向對象設計方法導論
全網最適合入門的面向對象編程教程:01 面向對象編程的基本概念
全網最適合入門的面向對象編程教程:02 類和對象的 Python 實現-使用 Python 創建類
全網最適合入門的面向對象編程教程:03 類和對象的 Python 實現-為自定義類添加屬性
全網最適合入門的面向對象編程教程:04 類和對象的Python實現-為自定義類添加方法
全網最適合入門的面向對象編程教程:05 類和對象的Python實現-PyCharm代碼標簽
全網最適合入門的面向對象編程教程:06 類和對象的Python實現-自定義類的數據封裝
全網最適合入門的面向對象編程教程:07 類和對象的Python實現-類型註解
全網最適合入門的面向對象編程教程:08 類和對象的Python實現-@property裝飾器
全網最適合入門的面向對象編程教程:09 類和對象的Python實現-類之間的關係
全網最適合入門的面向對象編程教程:10 類和對象的Python實現-類的繼承和里氏替換原則
全網最適合入門的面向對象編程教程:11 類和對象的Python實現-子類調用父類方法
全網最適合入門的面向對象編程教程:12 類和對象的Python實現-Python使用logging模塊輸出程式運行日誌
全網最適合入門的面向對象編程教程:13 類和對象的Python實現-可視化閱讀代碼神器Sourcetrail的安裝使用
全網最適合入門的面向對象編程教程:全網最適合入門的面向對象編程教程:14 類和對象的Python實現-類的靜態方法和類方法
全網最適合入門的面向對象編程教程:15 類和對象的 Python 實現-__slots__魔法方法
全網最適合入門的面向對象編程教程:16 類和對象的Python實現-多態、方法重寫與開閉原則
全網最適合入門的面向對象編程教程:17 類和對象的Python實現-鴨子類型與“file-like object“
全網最適合入門的面向對象編程教程:18 類和對象的Python實現-多重繼承與PyQtGraph串口數據繪製曲線圖
全網最適合入門的面向對象編程教程:19 類和對象的 Python 實現-使用 PyCharm 自動生成文件註釋和函數註釋
更多精彩內容可看:
給你的 Python 加加速:一文速通 Python 並行計算
一個MicroPython的開源項目集錦:awesome-micropython,包含各個方面的Micropython工具庫
文檔和代碼獲取:
可訪問如下鏈接進行對文檔下載:
https://github.com/leezisheng/Doc
本文檔主要介紹如何使用 Python 進行面向對象編程,需要讀者對 Python 語法和單片機開發具有基本瞭解。相比其他講解 Python 面向對象編程的博客或書籍而言,本文檔更加詳細、側重於嵌入式上位機應用,以上位機和下位機的常見串口數據收發、數據處理、動態圖繪製等為應用實例,同時使用 Sourcetrail 代碼軟體對代碼進行可視化閱讀便於讀者理解。
相關示例代碼獲取鏈接如下:https://github.com/leezisheng/Python-OOP-Demo
正文
前面講了面向類與對象的繼承,知道了繼承是一種什麼“是”什麼的關係。然而類與類之間還有另一種關係,這就是組合。組合是將幾個對象收集在一起生成一個新對象的行為。當一個對象是另外一個對象的一部分時,組合通常是不錯的選擇。
例如,汽車是由發動機、傳動裝置、啟動裝置、車前燈、擋風玻璃以及其他部件組成的,發動機又是由活塞、曲柄軸和閥門等組合而成的。汽車是發動機等多個元器件的抽象,而發動機是活塞等元器件的抽象,二者處於不同的層次而又有彼此交互的介面,組合是提供不同抽象層的好辦法。汽車對象可以提供司機所需要的介面,同時也能夠獲取內在組成部分,從而為機械師提供適合操作的深層抽象。當然,如果機械師需要更多信息來診斷問題或調整發動機,這些組成部分也可以進一步被細分。
總的來說,組合就是讓不同的類混合併且加入其他類中來增加功能和代碼重用性,這種適用於由多個小類組成一個大類的情況,並且不需要對小類進行太多修改。在前面示例中,我們實現了主機的串口收發和繪圖功能,在實際應用中,我們往往需要將感測器數據存儲到文件中,以便後續的查看和處理,很明顯前面的感測器數據為一維的時間序列數據,適合存儲為表格類型(即列標題為索引和值),我們通常將該類數據保存為 csv 格式文件,csv 是一種字元串文件的格式,它組織數據的語法就是在字元串之間加分隔符(行與行之間是加換行符,同行字元之間是加逗號分隔),可以用任意的文本編輯器打開(如記事本),也可以用 Excel 打開,還可以通過 Excel 把文件另存為 csv 格式。用 csv 格式存儲數據,讀寫比較方便,易於實現,文件也會比 Excel 文件小。但 csv 文件缺少 Excel 文件本身的很多功能,如不能嵌入圖像和圖表,不能生成公式等等。
操作 csv 文件我們需要藉助 csv 模塊,python 自帶 csv 模塊,不需要我們使用 pip 安裝,我們可以點擊如下鏈接查看 csv 模塊使用方法:
https://docs.python.org/zh-cn/3.13/library/csv.html#csv.writer
這裡,我們首先定義一個 FileIOClass 類,其中具有初始化方法、寫入感測器數據到文件方法和關閉文件方法,示例代碼如下:
import csv
_# 使用typing模塊提供的複合註解功能_
from typing import List
class FileIOClass:
def __init__(self,path:str="G:\\Python面向對象編程\\Demo\\file.csv"):
'''
初始化csv文件和列標題
:param path: 文件路徑和文件名
'''
self.path = path
_# path為輸出路徑和文件名,newline=''是為了不出現空行_
self.csvFile = open(path, "w+", newline='')
_# rowname為列名,index-索引,data-數據_
self.rowname = ['index', 'data']
_# 返回一個writer對象,將用戶的數據在給定的文件型對象上轉換為帶分隔符的字元串_
self.writer = csv.writer(self.csvFile)
_# 寫入csv文件的列標題_
self.writer.writerow( self.rowname)
def WriteFile(self,index:List[int],data:List[int])->None:
'''
:param index: 感測器索引列表
:param data: 感測器數據列表
:return:
'''
writedatalist = []
for i in range(len(data)):
writedatalist.append([index[i],data[i]])
_# 將列表中的每個元素將被寫入CSV文件的一列中_
self.writer.writerow(writedatalist[i])
def CloseFile(self)->None:
'''
關閉文件
:return: None
'''
self.csvFile.close()
這裡,在初始化方法中,我們需要傳入文件保存路徑。之後創建一個 writer 對象,將用戶的數據在給定的文件型對象上轉換為帶分隔符的字元串,同時寫入 csv 文件的列標題。在 WriteFile 方法中傳入數據的索引列表用於表示數據的先後順序,之後是數據列表(這裡的類型註解需要使用 typing 模塊提供的複合註解功能),並迴圈將每個元素將被寫入 CSV 文件的一列中,最後定義了文件的關閉方法。
在主函數中,我們創建 FileIOClass 對象,寫入模擬感測器數據後關閉文件,以下為示例代碼和運行效果:
if __name__ == '__main__':
path = "G:\\Python面向對象編程\\Demo\\file.csv"
data = [11,42,307,46,55,61,78,80,19,11]
index = [count for count in range(len(data))]
file = FileIOClass(path)
file.WriteFile(index,data)
file.CloseFile()
這裡,我們可以直接在 MasterClass 類的初始化中創建 FileIOClass 類的實例化對象來實現組合。代碼如下:
_# 文件保存路徑_
self.savepath = "G:\\Python面向對象編程\\Demo\\file.csv"
_# 創建FileIOClass類的實例化對象_
self.fileio = FileIOClass(self.savepath)
通過 sourcetrail,我們可以清晰看到類之間的組合與繼承關係:
在主程式中,我們在主機接收 10 次數據後,將數據保存到 file.csv 中:
if __name__ == "__main__":
_ # 創建數據列表_
datalist = []
m = MasterClass(state = MasterClass.IDLE_STATE,
port = "COM17",
wintitle = "Basic plotting examples",
plottitle = "Updating plot",
width = 1000,
height = 600)
m.StartMaster()
m.SendSensorCMD(MasterClass.SENDID_CMD)
m.RecvSensorID()
# 迴圈10次接收數據
for i in range(10):
m.SendSensorCMD(MasterClass.SENDVALUE_CMD)
value = m.RecvSensorValue()
datalist.append(value)
indexlist = [count for count in range(len(datalist))]
# 寫入數據
m.fileio.WriteFile(indexlist,datalist)
m.fileio.CloseFile()
如下為運行效果:
目前,整個文件的完整代碼如下,可以看到單單是這麼一個簡單程式就有了三百多行,對於代碼查找修改來講,非常不便。同時我們註意到,幾個不同類之間似乎功能並不相同,不應該放到一個文件中。下一節我們將會說如何利用 Python 中的模塊和包來組織我們的代碼。
完整代碼如下:
_# 串口相關庫_
import serial
import serial.tools.list_ports
_# 隊列相關_
import queue
import random
_# 日誌輸出相關庫_
import logging
_# 曲線作圖相關庫_
import pyqtgraph as pg
import numpy as np
from pyqtgraph.Qt import QtCore
_# 文件讀寫相關庫_
import csv
_# 使用typing模塊提供的複合註解功能_
from typing import List
import time
_# # 設置日誌輸出級別_
_# logging.basicConfig(level=logging.DEBUG)_
_# 在配置下日誌輸出目標文件和日誌格式_
LOG_FORMAT="%(asctime)s-%(levelname)s-%(message)s"
logging.basicConfig(filename='my.log',level=logging.DEBUG,format=LOG_FORMAT)
class SerialClass:
_# 限定SerialClass對象只能綁定以下屬性_
__slots__ = ('dev','_SerialClass__devstate')
_# 初始化_
_# 使用預設參數_
def __init__(self,
devport:str = "COM17",
devbaudrate:int = 115200,
devbytesize:int = serial.EIGHTBITS,
devparity :str = serial.PARITY_NONE,
devstopbits:int = serial.STOPBITS_ONE):
_# 直接傳入serial.Serial()類_
self.dev = serial.Serial()
self.dev.port = devport
self.dev.baudrate = devbaudrate
self.dev.bytesize = devbytesize
self.dev.parity = devparity
self.dev.stopbits = devstopbits
_# 表示串口設備的狀態-打開或者關閉_
_# 初始化時為關閉_
self.__devstate = False
print("SerialClass init")
logging.info("SerialClass init")
_# 取值方法_
@property
def devstate(self):
return self.__devstate
_# 打開串口_
def OpenSerial(self):
print("SerialClass-OpenSerial")
logging.info("SerialClass-OpenSerial")
self.dev.open()
self.__devstate = True
_# 關閉串口_
def CloseSerial(self):
print("SerialClass-CloseSerial")
logging.info("SerialClass-CloseSerial")
self.dev.close()
self.__devstate = False
_# 串口讀取_
def ReadSerial(self):
print("SerialClass-ReadSerial")
logging.info("SerialClass-ReadSerial")
if self.__devstate:
_# 阻塞方式讀取_
_# 按行讀取_
data = self.dev.readline()
_# 收到為二進位數據_
_# 用utf-8編碼將二進位數據解碼為unicode字元串_
_# 字元串轉為int類型_
data = int(data.decode('utf-8', 'replace'))
return data
_# 串口寫入_
def WriteSerial(self,write_data):
print("SerialClass-WriteSerial")
logging.info("SerialClass-WriteSerial")
if self.__devstate:
_# 非阻塞方式寫入_
self.dev.write(write_data.encode())
_# 輸出換行符_
_# write的輸入參數必須是bytes 格式_
_# 字元串數據需要encode()函數將其編碼為二進位數據,然後才可以順利發送_
_# \r\n表示換行回車_
self.dev.write('\r\n'.encode())
def RetSerialState(self):
if self.dev.isOpen():
self.__devstate = True
return True
else:
self.__devstate = False
return False
class PlotClass:
_# 繪圖類初始化_
def __init__(self,wintitle:str="Basic plotting examples",plottitle:str="Updating plot",width:int=1000,height:int=600):
'''
用於初始化Plot類
:param wintitle: 視窗標題
:param plottitle: 圖層標題
:param width: 視窗寬度
:param height: 視窗高度
'''
_# Qt應用實例對象_
self.app = None
_# 視窗對象_
self.win = None
_# 設置視窗標題_
self.title = wintitle
_# 設置視窗尺寸_
self.width = width
self.height = height
_# 感測器數據_
self.value = 0
_# 計數變數_
self.__count = 0
_# 感測器數據緩存列表_
self.valuelist = []
_# 繪圖曲線_
self.curve = None
_# 圖層對象_
self.plotob = None
_# 圖層標題_
self.plottitle = plottitle
_# 定時器對象_
self.timer = QtCore.QTimer()
_# 定時時間_
self.time = 0
_# Qt應用和視窗初始化_
self.appinit()
print("PLOT INIT SUCCESS")
logging.info("PLOT INIT SUCCESS")
_# 應用程式初始化_
def appinit(self):
'''
用於qt應用程式初始化,添加視窗、曲線和圖層
:return: None
'''
_# 創建一個Qt應用,並返回該應用的實例對象_
self.app = pg.mkQApp("Plotting Example")
_# 生成多面板圖形_
_# show:(bool) 如果為 True,則在創建小部件後立即顯示小部件。_
_# title:(str 或 None)如果指定,則為此小部件設置視窗標題。_
self.win = pg.GraphicsLayoutWidget(show=True, title=self.title)
_# 設置視窗尺寸_
self.win.resize(self.width, self.height)
_# 進行視窗全局設置,setConfigOptions一次性配置多項參數_
_# antialias啟用抗鋸齒,useNumba對圖像進行加速_
pg.setConfigOptions(antialias=True, useNumba=True)
_# 添加圖層_
self.plotob = self.win.addPlot(title=self.plottitle)
_# 添加曲線_
self.curve = self.plotob.plot(pen='y')
_# 接收數據_
def GetValue(self,value):
'''
用於接收感測器數據,加入緩存列表
:param value: 感測器數據
:return: None
'''
self.value = value
_# 加入數據緩存列表_
self.valuelist.append(value)
print("PLOT RECV DATA : "+str(self.value))
logging.info("PLOT RECV DATA : "+str(self.value))
_# 更新曲線數據_
def DataUpdate(self):
'''
用於定時進行曲線更新,這裡模擬繪製正弦曲線
:return: None
'''
_# 模擬繪製正弦曲線_
_# 計數變數更新_
self.__count = self.__count + 0.1
self.value = np.sin(self.__count)
self.GetValue(self.value)
_# 將數據轉化為圖形_
self.curve.setData(self.valuelist)
_# 設置定時更新_
def SetUpdate(self,time:int = 100):
'''
設置定時更新任務
:param time: 定時的時間
:return: None
'''
_# 定時器結束,觸發DataUpdate方法_
self.timer.timeout.connect(self.DataUpdate)
_# 啟動定時器_
self.timer.start(time)
_# 定時時間_
self.time = time
print("PLOT SET UPDATA")
logging.info("PLOT SET UPDATA")
_# 進入主事件迴圈並等待_
pg.exec()
class FileIOClass:
def __init__(self,path:str="G:\\Python面向對象編程\\Demo\\file.csv"):
'''
初始化csv文件和列標題
:param path: 文件路徑和文件名
'''
self.path = path
_# path為輸出路徑和文件名,newline=''是為了不出現空行_
self.csvFile = open(path, "w+", newline='')
_# rowname為列名,index-索引,data-數據_
self.rowname = ['index', 'data']
_# 返回一個writer對象,將用戶的數據在給定的文件型對象上轉換為帶分隔符的字元串_
self.writer = csv.writer(self.csvFile)
_# 寫入csv文件的列標題_
self.writer.writerow(self.rowname)
def WriteFile(self,index:List[int],data:List[int])->None:
'''
:param index: 感測器索引列表
:param data: 感測器數據列表
:return:
'''
writedatalist = []
for i in range(len(data)):
writedatalist.append([index[i],data[i]])
_# 將列表中的每個元素將被寫入CSV文件的一列中_
self.writer.writerow(writedatalist[i])
def CloseFile(self)->None:
'''
關閉文件
:return: None
'''
self.csvFile.close()
class SensorClass(SerialClass):
_# 類變數:_
_# RESPOND_MODE -響應模式-0_
_# LOOP_MODE -迴圈模式-1_
RESPOND_MODE,LOOP_MODE = (0,1)
_# 類變數:_
_# START_CMD - 開啟命令 -0_
_# STOP_CMD - 關閉命令 -1_
_# SENDID_CMD - 發送ID命令 -2_
_# SENDVALUE_CMD - 發送數據命令 -3_
START_CMD,STOP_CMD,SENDID_CMD,SENDVALUE_CMD = (0,1,2,3)
_# 類的初始化_
def __init__(self,port:str = "COM11",id:int = 0,state:int = RESPOND_MODE):
_# 調用父類的初始化方法,super() 函數將父類和子類連接_
super().__init__(port)
self.sensorvalue = 0
self.sensorid = id
self.sensorstate = state
print("Sensor Init")
logging.info("Sensor Init")
@staticmethod
_# 判斷感測器ID號是否正確:這裡判斷ID號是否在0到99之間_
def IsTrueID(id:int = 0):
if id >= 0 and id <= 99:
print("Sensor ID True")
return True
else:
print("Sensor ID False")
return False
_# 感測器上電初始化_
def InitSensor(self):
_# 感測器上電初始化工作_
_# 同時輸出ID號以及狀態_
print("Sensor %d Init complete : %d"%(self.sensorid,self.sensorstate))
logging.info("Sensor %d Init complete : %d"%(self.sensorid,self.sensorstate))
_# 開啟感測器_
def StartSensor(self):
super().OpenSerial()
print("Sensor %d start serial %s "%(self.sensorid,self.dev.port))
logging.info("Sensor %d start serial %s "%(self.sensorid,self.dev.port))
_# 停止感測器_
def StopSensor(self):
super().CloseSerial()
print("Sensor %d close serial %s " % (self.sensorid, self.dev.port))
logging.info("Sensor %d close serial %s " % (self.sensorid, self.dev.port))
_# 發送感測器ID號_
def SendSensorID(self):
super().WriteSerial(str(self.sensorid))
print("Sensor %d send id "%self.sensorid)
logging.info("Sensor %d send id "%self.sensorid)
_# 發送感測器數據_
def SendSensorValue(self):
_# 生成[1, 10]內的隨機整數_
data = random.randint(1, 10)
super().WriteSerial(str(data))
print("Sensor %d send data %d" % (self.sensorid,data))
logging.info("Sensor %d send data %d" % (self.sensorid,data))
_# 接收主機指令_
def RecvMasterCMD(self):
cmd = super().ReadSerial()
print("Sensor %d recv cmd %d " % (self.sensorid,cmd))
logging.info("Sensor %d recv cmd %d " % (self.sensorid,cmd))
return cmd
class MasterClass(SerialClass,PlotClass):
_# 類變數:_
_# BUSY_STATE -忙碌狀態-0_
_# IDLE_STATE -空閑狀態-1_
BUSY_STATE, IDLE_STATE = (0, 1)
_# 類變數:_
_# START_CMD - 開啟命令 -0_
_# STOP_CMD - 關閉命令 -1_
_# SENDID_CMD - 發送ID命令 -2_
_# SENDVALUE_CMD - 發送數據命令 -3_
START_CMD, STOP_CMD, SENDID_CMD, SENDVALUE_CMD = (0, 1, 2, 3)
_# 類的初始化_
def __init__(self,state:int = IDLE_STATE,port:str = "COM17",wintitle:str="Basic plotting examples",plottitle:str="Updating plot",width:int=1000,height:int=600):
_# 分別調用不同父類的__init__方法_
SerialClass.__init__(self,port)
PlotClass.__init__(self,wintitle,plottitle,width,height)
self.valuequeue = queue.Queue(10)
self.__masterstatue = state
_# 初始化完成的標誌量_
self.INIT_FLAG = False
_# 文件保存路徑_
self.savepath = "G:\\Python面向對象編程\\Demo\\file.csv"
_# 創建FileIOClass類的實例化對象_
self.fileio = FileIOClass(self.savepath)
print("MASTER INIT SUCCESSS")
logging.info("MASTER INIT SUCCESSS")
@classmethod
def MasterInfo(cls):
print("Info : "+str(cls))
_# 開啟主機_
def StartMaster(self):
super().OpenSerial()
print("START MASTER :"+self.dev.port)
logging.info("START MASTER :"+self.dev.port)
_# 停止主機_
def StopMaster(self):
super().CloseSerial()
print("CLOSE MASTER :" + self.dev.port)
logging.info("CLOSE MASTER :" + self.dev.port)
_# 接收感測器ID號_
def RecvSensorID(self):
sensorid = super().ReadSerial()
print("MASTER RECIEVE ID : " + str(sensorid))
logging.info("MASTER RECIEVE ID : " + str(sensorid))
return sensorid
_# 接收感測器數據_
def RecvSensorValue(self):
data = super().ReadSerial()
print("MASTER RECIEVE DATA : " + str(data))
logging.info("MASTER RECIEVE DATA : " + str(data))
self.valuequeue.put(data)
return data
_# 主機發送命令_
def SendSensorCMD(self,cmd):
super().WriteSerial(str(cmd))
print("MASTER SEND CMD : " + str(cmd))
logging.info("MASTER SEND CMD : " + str(cmd))
_# 主機返回工作狀態-_
def RetMasterStatue(self):
return self.__masterstatue
_# 重寫父類的DataUpdate方法_
def DataUpdate(self):
self.SendSensorCMD(self.SENDVALUE_CMD)
self.value = self.RecvSensorValue()
self.WriteSerial("Recv:"+str(self.value))
self.GetValue(self.value)
self.curve.setData(self.valuelist)
print("PLOT UPDATA : " + str(self.value))
logging.info("PLOT UPDATA : " + str(self.value))
class DevClass(SerialClass):
def __init__(self,port:str = "COM1"):
super().__init__(port)
_# 開啟設備_
def StartDev(self):
super().OpenSerial()
print("START Dev :" + self.dev.port)
def ReadSerial(self,byte_size):
if super().RetSerialState():
data = self.dev.read(byte_size)
data = int(data.decode('utf-8', 'replace'))
return data
_# 判斷串口類對象的串口是否開啟_
def IsSerialConnected(serialclass):
return serialclass.RetSerialState()
if __name__ == "__main__":
_# 創建數據列表_
datalist = []
m = MasterClass(state = MasterClass.IDLE_STATE,
port = "COM17",
wintitle = "Basic plotting examples",
plottitle = "Updating plot",
width = 1000,
height = 600)
m.StartMaster()
m.SendSensorCMD(MasterClass.SENDID_CMD)
m.RecvSensorID()
_# 迴圈10次接收數據_
for i in range(10):
m.SendSensorCMD(MasterClass.SENDVALUE_CMD)
value = m.RecvSensorValue()
datalist.append(value)
indexlist = [count for count in range(len(datalist))]
_# 寫入數據_
m.fileio.WriteFile(indexlist,datalist)
m.fileio.CloseFile()