8.0 Python 使用進程與線程

来源:https://www.cnblogs.com/LyShark/archive/2023/08/14/17628767.html
-Advertisement-
Play Games

python 進程與線程是併發編程的兩種常見方式。進程是操作系統中的一個基本概念,表示程式在操作系統中的一次執行過程,擁有獨立的地址空間、資源、優先順序等屬性。線程是進程中的一條執行路徑,可以看做是輕量級的進程,與同一個進程中的其他線程共用相同的地址空間和資源。 ...


python 進程與線程是併發編程的兩種常見方式。進程是操作系統中的一個基本概念,表示程式在操作系統中的一次執行過程,擁有獨立的地址空間、資源、優先順序等屬性。線程是進程中的一條執行路徑,可以看做是輕量級的進程,與同一個進程中的其他線程共用相同的地址空間和資源。

線程和進程都可以實現併發編程,但是它們之間有幾點不同:

  • 線程間共用進程的記憶體空間,但進程間的記憶體空間是相互獨立的;
  • 線程創建和銷毀的開銷較小,但是線程切換的開銷較大;
  • 進程間通信需要較為複雜的 IPC(Inter-Process Communication)機制,線程間通信則可以直接讀寫共用記憶體;
  • 多進程可以充分利用多核 CPU 的性能,但是多線程受 GIL(Global Interpreter Lock)限制,只能利用單核 CPU 的性能。

在選擇使用進程還是線程時,需要根據具體場景和需求進行權衡和選擇。如果任務需要充分利用多核 CPU,且任務之間互不影響,可以選擇多進程;如果任務之間需要共用資源和數據,可以選擇多線程。同時,需要註意在 python 中使用多線程時,由於 GIL 的存在,可能無法實現真正的並行。

8.1 創建並使用線程

線程是操作系統調度的最小執行單元,是進程中的一部分,能夠提高程式的效率。在python中,創建線程需要使用threading模塊。該模塊的實現方法是底層調用了C語言的原生函數來實現線程的創建和管理。在系統中,所有的線程看起來都是同時執行的,但實際上是由操作系統進行時間片輪轉調度的。

使用函數創建線程: 創建線程並傳遞參數實現指定函數多線程併發,使用join方法,等待線程執行完畢後的返回結果.

import os,time
import threading
now = lambda:time.time()

def MyThread(x,y):                         # 定義每個線程要執行的函數體
    time.sleep(5)                          # 睡眠5秒鐘
    print("傳遞的數據:%s,%s"%(x,y))         # 其中有兩個參數,我們動態傳入

if __name__ == "__main__":
    ThreadPool = []

    start = now()
    for item in range(10):                                             # 創建10個線程併發執行函數
        thread = threading.Thread(target=MyThread,args=(item,item+1,)) # args =>函數的參數
        thread.start()                                                 # 啟動線程
        ThreadPool.append(thread)
    for item in ThreadPool:
        item.join()
        print("[+] 線程信息: {}".format(item))
    stop = now()
    print("[+] 線程總耗時: {} s".format(int(stop-start)))

使用類創建內部線程: 通過定義類,將線程函數與類進行結合實現一體化該方式調用方便思維明確.

import os,time
import threading

class MyThread(threading.Thread):
    def __init__(self,x,y):
        super(MyThread, self).__init__()
        self.x = x
        self.y = y

    def run(self):                       # 用於執行相應操作(固定寫法)
        print("[+] 當前執行運算: {} + {}".format(self.x,self.y))
        self.result = self.x + self.y

    def get_result(self):                # 獲取計算結果
        try:
            return self.result
        except Exception:
            return None

if __name__ == "__main__":
    ThreadPool = []

    for item in range(1,10):
        obj = MyThread(item,item+1)
        obj.start()
        ThreadPool.append(obj)

    for item in ThreadPool:
        item.join()
        print("[+] 獲取返回: ",item.get_result())

使用類創建外部線程: 該定義方式與上方完全不同,我們可以將執行過程定義到類的外部為單獨函數,然後類內部去調用傳參.

import os,time
import threading

def MyThreadPrint(x,y):
    print("[+] 當前執行運算: {} + {}".format(x,y))
    result = x + y
    return result

class MyThread(threading.Thread):
    def __init__(self,func,args=()):
        super(MyThread, self).__init__()
        self.func = func
        self.args = args

    def run(self):
        self.result = self.func(*self.args)

    def get_result(self):
        try:
            return self.result
        except Exception:
            return None

if __name__ == "__main__":
    ThreadPool = []

    for item in range(1,10):
        obj = MyThread(func=MyThreadPrint,args=(item,item+1))
        obj.start()
        ThreadPool.append(obj)

    for item in ThreadPool:
        item.join()
        print("[+] 獲取返回: ",item.get_result())

線上程中創建子線程: 通過創建一個守護線程,並讓守護線程調用子線程,從而實現線程中調用線程,線程嵌套調用.

import time
import threading

# run => 子線程 => 由主線程調用它
def run(num):
    print("這是第 {} 個子線程".format(num))
    time.sleep(2)

# main = > 主守護線程 => 在裡面運行5個子線程
def main():
    for each in range(5):
        thread = threading.Thread(target=run,args=(each,))
        thread.start()
        print("啟動子線程: {} 編號: {}".format(thread.getName(),each))
    thread.join()

if __name__ == "__main__":
    daemon = threading.Thread(target=main,args=())
    daemon.setDaemon(True)   # 設置主線程為守護線程
    daemon.start()           # 啟動守護線程
    daemon.join(timeout=10)  # 設置10秒後關閉,不論子線程是否執行完畢

簡單的線程互斥鎖(Semaphore): 同時允許一定數量的線程更改數據,也就是限制每次允許執行的線程數.

import threading,time
semaphore = threading.BoundedSemaphore(5)         #最多允許5個線程同時運行

def run(n):
    semaphore.acquire()                           #添加信號
    time.sleep(1)
    print("運行這個線程中: %s"%n)
    semaphore.release()                           #關閉信號

if __name__ == '__main__':
    for i in range(20):                           #同時執行20個線程
        t = threading.Thread(target=run, args=(i,))
        t.start()

while threading.active_count() != 1:              #等待所有線程執行完畢
    pass
else:
    print('----所有線程執行完畢了---')
import threading,time

class mythreading(threading.Thread):
    def run(self):
        semaphore.acquire()  #獲取信號量鎖
        print('running the thread:',self.getName())
        time.sleep(2)
        semaphore.release()  #釋放信號量鎖

if __name__ == "__main__":
    semaphore = threading.BoundedSemaphore(3) # 只運行3個線程同時運行
    for i in range(20):
        t1 = mythreading()
        t1.start()
    t1.join()

線程全局鎖(Lock): 添加本全局鎖以後,能夠保證在同一時間內保證只有一個線程具有許可權.

import time
import threading

num = 0                  #定義全局共用變數
thread_list = []         #線程列表
lock = threading.Lock()  #生成全局鎖

def SumNumber():
    global num          #在每個線程中獲取這個全局變數
    time.sleep(2)
    lock.acquire()      #修改數據前給數據加鎖
    num += 1            #每次進行遞增操作
    lock.release()      #執行完畢以後,解除鎖定


for x in range(50):     #指定生成線程數
    thread = threading.Thread(target=SumNumber)
    thread.start()              #啟動線程
    thread_list.append(thread)  #將結果列表加入到變數中

for y in thread_list:           #等待執行完畢.
    y.join()

print("計算結果: ",num)

線程遞歸鎖(RLock): 遞歸鎖和全局鎖差不多,遞歸鎖就是在大鎖中還要添加個小鎖,遞歸鎖是常用的鎖.

import threading
import time

num = 0                          #初始化全局變數
lock = threading.RLock()         #設置遞歸鎖

def fun1():
    lock.acquire()              #添加遞歸鎖
    global num
    num += 1
    lock.release()              #關閉遞歸鎖
    return num

def fun2():
    lock.acquire()              #添加遞歸鎖
    res = fun1()
    print("計算結果: ",res)
    lock.release()              #關閉遞歸鎖

if __name__ == "__main__":
    for x in range(10):         #生成10個線程
        thread = threading.Thread(target=fun2)
        thread.start()

while threading.active_count() != 1:   #等待所有線程執行完成
    print(threading.active_count())
else:
    print("所有線程運行完成...")
    print(num)

線程互斥鎖量控制併發: 使用BoundedSemaphore定義預設信號10,既可以實現控制單位時間內的程式併發量.

import os,time
import threading

def MyThread(x):
    lock.acquire()       # 上鎖
    print("執行數據: {}".format(x))
    lock.release()       # 釋放鎖
    time.sleep(2)        # 模擬函數消耗時間
    threadmax.release()  # 釋放信號,可用信號加1

if __name__ == "__main__":
    # 此處的BoundedSemaphore就是說預設給與10個信號
    threadmax = threading.BoundedSemaphore(10)  # 限制線程的最大數量為10個
    lock = threading.Lock()   # 將鎖內的代碼串列化(防止print輸出亂行)
    ThreadPool = []           # 執行線程池

    for item in range(1,100):
        threadmax.acquire()  # 增加信號,可用信號減1
        thread = threading.Thread(target=MyThread,args=(item,))
        thread.start()
        ThreadPool.append(thread)

    for item in ThreadPool:
        item.join()

線程驅動事件(Event): 線程事件用於主線程式控制制其他線程的執行,事件主要提供了三個方法set、wait、clear、is_set,分別用於設置檢測和清除標誌.

import threading
event = threading.Event()

def func(x,event):
    print("函數被執行了: %s 次.." %x)
    event.wait()         # 檢測標誌位狀態,如果為True=繼續執行以下代碼,反之等待.
    print("載入執行結果: %s" %x)

for i in range(10):      # 創建10個線程
    thread = threading.Thread(target=func,args=(i,event,))
    thread.start()

print("當前狀態: %s" %event.is_set())      # 檢測當前狀態,這裡為False
event.clear()                             # 將標誌位設置為False,預設為False
temp=input("輸入yes解鎖新姿勢: ")          # 輸入yes手動設置為True
if temp == "yes":
    event.set()                           # 設置成True
    print("當前狀態: %s" %event.is_set())  # 檢測當前狀態,這裡為True
import threading

def show(event):
    event.wait()                     # 阻塞線程執行程式
    print("執行一次線程操作")

if __name__ == "__main__":
    event_obj = threading.Event()    # 創建event事件對象
    for i in range(10):
        t1 = threading.Thread(target=show,args=(event_obj,))
        t1.start()
        inside = input('>>>:')
        if inside == '1':
            event_obj.set() # 當用戶輸入1時set全局Flag為True,線程不再阻塞
        event_obj.clear()   # 將Flag設置為False

線程實現條件鎖: 條件(Condition) 使得線程等待,只有滿足某條件時,才釋放N個線程.

import threading

def condition_func():
    ret = False
    inp = input(">> ")
    if inp == '1':
        ret = True
    return ret

def run(n):
    con.acquire()                # 條件鎖
    con.wait_for(condition_func) # 判斷條件
    print('running...',n)
    con.release()                # 釋放鎖

if __name__ == "__main__":
    con = threading.Condition()  # 建立線程條件對象
    for i in range(10):
        t = threading.Thread(target=run,args=(i,))
        t.start()
        t.join()

單線程非同步併發執行: 在單線程下實現非同步執行多個函數,返回耗時取決於最後一個函數的執行時間.

import time,asyncio

now = lambda :time.time()

async def GetSystemMem(sleep):
    print("[+] 執行獲取記憶體非同步函數.")
    await asyncio.sleep(sleep)   # 設置等待時間
    return 1

async def GetSystemCPU(sleep):
    print("[+] 執行獲取CPU非同步函數.")
    await asyncio.sleep(sleep)   # 設置等待時間
    return 1

if __name__ == "__main__":
    stop = now()
    mem = GetSystemMem(1)
    cpu = GetSystemCPU(4)

    task=[
        asyncio.ensure_future(mem),             # 將多個任務添加進一個列表
        asyncio.ensure_future(cpu)
    ]
    loop=asyncio.get_event_loop()               # 創建一個事件迴圈
    loop.run_until_complete(asyncio.wait(task)) # 開始併發執行

    for item in task:
        print("[+] 返回結果: ",item.result())    # 輸出回調
    print('總耗時: {}'.format(stop - now()))

8.2 創建並使用進程

進程是指正在執行的程式,創建進程需要使用multiprocessing模塊,創建方法和線程相同,但由於進程之間的數據需要各自持有一份,所以創建進程需要更大的開銷。進程間數據不共用,多進程可以用來處理多任務,但很消耗資源。計算密集型任務最好交給多進程來處理,I/O密集型任務最好交給多線程來處理。另外,進程的數量應該和CPU的核心數保持一致,以充分利用系統資源。

使用進程函數執行命令: 通過系統提供的進程線程函數完成對系統命令的調用與執行.

>>> import os,subprocess
>>>
>>> os.system("ping -n 1 www.baidu.com")       # 在當前shell中執行命令
>>>
>>> ret = os.popen("ping -n 1 www.baidu.com")  # 在子shell中執行命令
>>> ret.read()
>>>
>>> subprocess.run("ping www.baidu.com",shell=True)
>>> subprocess.call("ping www.baidu.com", shell=True)
>>>
>>> ret = subprocess.Popen("ping www.baidu.com",shell=True,stdout=subprocess.PIPE)
>>> ret.stdout.read()

創建多進程與子線程: 通過使用multiprocessing庫,迴圈創建4個主進程,而在每個主進程內部又起了5個子線程.

import multiprocessing
import threading,os

def ThreadingFunction():
    print("[-] ----> 子線程PPID: {}".format(threading.get_ident()))

def ProcessFunction(number):
    print("[*] -> 主進程PID: {} 父進程: {}".format(os.getpid(),os.getppid()))
    for i in range(5):                                       # 在主進程里開闢5個線程
        thread = threading.Thread(target=ThreadingFunction,) # 嵌套子線程
        thread.start()                                       # 執行子線程

if __name__ == "__main__":
    for item in range(4):                                    # 啟動4個主進程
        proc = multiprocessing.Process(target=ProcessFunction,args=(item,))
        proc.start()
        proc.join()

使用基於類的方式創建進程: 除了使用函數式方式創建進程以外,我們還可以使用基於類的方式創建.

import os,time
from multiprocessing import Process

class Myprocess(Process):
    def __init__(self,person):
        super().__init__()
        self.person = person

    def run(self):
        print("[*] -> 當前PID: {}".format(os.getpid()))
        print("--> 傳入的人名: {}".format(self.person))
        time.sleep(3)

if __name__ == '__main__':
    process = Myprocess("lyshark")
    #process.daemon = True # 設置p為守護進程
    process.start()

進程鎖(Lock): 進程中也有鎖,可以實現進程之間數據的一致性,也就是進程數據的同步,保證數據不混亂.

# 由併發變成了串列,犧牲了運行效率,但避免了競爭
import multiprocessing

def func(loc,num):
    loc.acquire()                        #添加進程鎖
    print("hello ---> %s" %num)
    loc.release()                        #關閉進程鎖

if __name__ == "__main__":
    lock = multiprocessing.Lock()        #生成進程鎖

    for number in range(10):
        proc = multiprocessing.Process(target=func,args=(lock,number,))
        proc.start()

非同步進程池: 進程池內部維護一個進程式列,當使用時則去進程池中獲取一個進程,如果進程池序列中沒有可供使用的進程,那麼程式就會等待,直到進程池中有可用進程為止.

import multiprocessing
import time

def ProcessFunction(number):
    time.sleep(2)
    print("[+] 進程執行ID: {}".format(number))

def ProcessCallback(arg):
    print("[-] 進程執行結束,執行回調函數")

if __name__ == "__main__":
    pool = multiprocessing.Pool(processes=5)               # 允許進程池同時放入5個進程
    for item in range(10):
        pool.apply_async(func=ProcessFunction,args=(item,),callback=ProcessCallback)
    pool.close()
    pool.join()
from multiprocessing import Pool, TimeoutError
import time,os

def f(x):
    return x*x

if __name__ == '__main__':
    #啟動4個工作進程作為進程池
    with Pool(processes=4) as pool:
        #返回函數參數運行結果列表
        print(pool.map(f, range(10)))
        #在進程池中以任意順序列印相同的數字
        for i in pool.imap_unordered(f, range(10)):
            print(i,end=' ')
        #非同步評估
        res = pool.apply_async(f,(20,))      #在進程池中只有一個進程運行
        print('\n',res.get(timeout=1))       #列印結果,超時為1秒
        #列印該進程的PID
        res = pool.apply_async(os.getpid,()) #在進程池中只有一個進程運行
        print(res.get(timeout=1))            #列印進程PID

        #列印4個進程的PID
        multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
        print([res.get(timeout=1) for res in multiple_results])

        #進程等待10秒,獲取數據超時為1秒,將輸出異常
        res = pool.apply_async(time.sleep, (10,))
        try:
            print(res.get(timeout=1))
        except TimeoutError:
            print("We lacked patience and got a multiprocessing.TimeoutError")

8.3 多進程數據共用

一般當我們創建兩個進程後,進程各自持有一份數據,預設無法共用數據,如果我們想要共用數據必須通過一個中間件來實現數據的交換,來幫你把數據進行一個投遞,要實現進程之間的數據共用,其主要有以下幾個方法來實現進程間數據的共用.

共用隊列(Queue): 這個Queue主要實現進程與進程之間的數據共用,與線程中的Queue不同.

from multiprocessing import Process
from multiprocessing import queues
import multiprocessing
 
def foo(i,arg):
    arg.put(i)
    print('say hi',i,arg.qsize())
 
li = queues.Queue(20,ctx=multiprocessing)
 
for i in range(10):
    p = Process(target=foo,args=(i,li,))
    p.start()

共用整數(int): 整數之間的共用,只需要使用multiprocessing.Value方法,即可實現.

import multiprocessing

def func(num):
    num.value = 1024                              #雖然賦值了,但是子進程改變了這個數值
    print("函數中的數值: %s"%num.value)


if __name__ == "__main__":
    num = multiprocessing.Value("d",10.0)         #主進程與子進程共用這個value
    print("這個共用數值: %s"%num.value)

    for i in range(5):
        num = multiprocessing.Value("d", i)      #聲明進程,並傳遞1,2,3,4這幾個數
        proc = multiprocessing.Process(target=func,args=(num,))
        proc.start()                             #啟動進程
        #proc.join()
        print("最後列印數值: %s"%num.value)

共用數組(Array): 數組之間的共用,只需要使用multiprocessing.Array方法,即可實現.

import multiprocessing


def func(ary):       #子進程改變數組,主進程跟著改變
    ary[0]=100
    ary[1]=200
    ary[2]=300

''' i所對應的類型是ctypes.c_int,其他類型如下參考:
    'c': ctypes.c_char,  'u': ctypes.c_wchar,
    'b': ctypes.c_byte,  'B': ctypes.c_ubyte,
    'h': ctypes.c_short, 'H': ctypes.c_ushort,
    'i': ctypes.c_int,   'I': ctypes.c_uint,
    'l': ctypes.c_long,  'L': ctypes.c_ulong,
    'f': ctypes.c_float, 'd': ctypes.c_double
'''

if __name__ == "__main__":
    ary = multiprocessing.Array("i",[1,2,3])   #主進程與子進程共用這個數組

    for i in range(5):
        proc = multiprocessing.Process(target=func,args=(ary,))
        print(ary[:])
        proc.start()

共用字典(dict): 通過使用Manager方法,實現兩個進程中的,字典與列表的數據共用.

import multiprocessing

def func(mydict, mylist):
    mydict["字典1"] = "值1"
    mydict["字典2"] = "值2"
    mylist.append(1)
    mylist.append(2)
    mylist.append(3)

if __name__ == "__main__":

    mydict = multiprocessing.Manager().dict()        #主進程與子進程共用字典
    mylist = multiprocessing.Manager().list()        #主進程與子進程共用列表

    proc = multiprocessing.Process(target=func,args=(mydict,mylist))
    proc.start()
    proc.join()

    print("列表中的元素: %s" %mylist)
    print("字典中的元素: %s" %mydict)

管道共用(Pipe): 通過Pipe管道的方式在兩個進程之間共用數據,類似於Socket套接字.

import multiprocessing

def func(conn):
    conn.send("你好我是子進程.")                      #發送消息給父進程
    print("父進程傳來了:",conn.recv())                #接收父進程傳來的消息
    conn.close()

if __name__ == "__main__":
    parent_conn,child_conn = multiprocessing.Pipe()  #管道創建兩個埠,一收一發送
    proc = multiprocessing.Process(target=func,args=(child_conn,))
    proc.start()

    print("子進程傳來了:",parent_conn.recv())         #接收子進程傳來的數據
    parent_conn.send("我是父進程,收到消息了..")        #父進程發送消息給子進程

本文作者: 王瑞
本文鏈接: https://www.lyshark.com/post/b4dd0803.html
版權聲明: 本博客所有文章除特別聲明外,均採用 BY-NC-SA 許可協議。轉載請註明出處!


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • springboot項目通常配合mybatisplus來做數據CRUD。 我們在查詢或更新數據的時候,有時要用到in來過濾數據。比如SELECT * FROM emax_scbg_order WHERE order_no IN (1305679009380433922,130540525947283 ...
  • Lua程式設計第四版第二部分編程實操自做練習題答案,帶:star:為重點。 ## 9.1 > 請編寫一個函數integral,該函數以一個函數f為參數並返回其積分的近似值 使用右矩陣法近似積分值 ```lua function integral(f) return function(a, b) lo ...
  • 如果一個變數應該有一個固定的、不能改變的值,你可以使用`const`關鍵字。 `const`關鍵字將變數聲明為"常量",這意味著它是**不可改變和只讀**的。 **語法** `const CONSTNAME type = value` ## 聲明常量 聲明常量的示例: ```Go package m ...
  • 最近在寫支付的東西,調試時候需要讓支付平臺能夠回調本地介面來更新支付成功的狀態。但由於開發機器沒有公網IP,所以需要使用內網穿透來讓支付平臺能夠成功訪問到本地開發機器,這樣才能更高效率的進行調試。 推薦內網穿透的文章已經很多很多,還有很多大合集的推薦,但也因為推薦的太多,也會讓人眼花繚亂,不斷嘗試不 ...
  • 在自動化測試腳本的運行過程中,webdriver操作瀏覽器的時候,對於元素的定位是有一定的超時時間,大致應該在1-3秒的樣子,如果這個時間內仍然定位不到元素,就會拋出異常,中止腳本執行。我們可以通過在腳本中設置等待的方式來避免由於網路延遲或瀏覽器卡頓導致的偶然失敗,常用的等待方式有三種: ### 一 ...
  • 剛開始收到磁碟告警的時候,懷疑是日誌級別問題,業務日誌輸出過多導致磁碟打滿。但是查看我們自己的業務日誌文件目錄,每個日誌文件內容都不是很大。 ...
  • 1. <?php //單鏈表 class node { public $id; //節點id public $name; //節點名稱 public $next; //下一節點 public function __construct($id, $name) { $this->id = $id; $t ...
  • 我在上一章《形象談JVM-第一章-認識JVM》提到的“翻譯”,其實就是我們今天所說的“編譯”的概念。 上一章原文鏈接:https://www.cnblogs.com/xingxiangtan/p/17617654.html 原文: 【 虛擬機的職責是將位元組碼翻譯成對應系統能夠識別並執行的機器碼, 比 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...