Python3 系列之 並行編程

来源:https://www.cnblogs.com/hippieZhou/archive/2019/09/13/10205308.html
-Advertisement-
Play Games

進程和線程 進程是程式運行的實例。一個進程裡面可以包含多個線程,因此同一進程下的多個線程之間可以共用線程內的所有資源,它是操作系統動態運行的基本單元;每一個線程是進程下的一個實例,可以動態調度和獨立運行,由於線程和進程有很多類似的特點,因此,線程又被稱為輕量級的進程。線程的運行在進程之下,進程的存在 ...


進程和線程

進程是程式運行的實例。一個進程裡面可以包含多個線程,因此同一進程下的多個線程之間可以共用線程內的所有資源,它是操作系統動態運行的基本單元;每一個線程是進程下的一個實例,可以動態調度和獨立運行,由於線程和進程有很多類似的特點,因此,線程又被稱為輕量級的進程。線程的運行在進程之下,進程的存在依賴於線程;

開胃菜

基於 Python3 創建一個簡單的進程示例

from threading import Thread
from time import sleep


class CookBook(Thread):
    def __init__(self):
        Thread.__init__(self)
        self.message = "Hello Parallel Python CookBook!!\n"

    def print_message(self):
        print(self.message)

    def run(self):
        print("Thread Starting\n")
        x = 0
        while x < 10:
            self.print_message()
            sleep(2)
            x += 1
        print("Thread Ended!\n")


print("Process Started")
hello_python = CookBook()

hello_python.start()
print("Process Ended")

需要註意的是,永遠不要讓線程在後臺默默執行,當其執行完畢後要及時釋放資源。

基於線程的並行

多線程編程一般使用共用記憶體空間進行線程間的通信,這就使管理記憶體空間成為多線程編程的關鍵。Python 通過標準庫 threading 模塊來管理線程,具有以下的組件:

  • 線程對象
  • Lock 對象
  • RLock 對象
  • 信號對象
  • 條件對象
  • 事件對象

定義一個線程

基本語法

示例代碼如下所示

import threading


def function(i):
    print("function called by thread: {0}".format(i))
    return


threads = []
for i in range(5):
    t = threading.Thread(target=function, args=(i,))
    threads.append(t)
    t.start()

lambda t, threads: t.join()

需要註意的是,線程創建後並不會自動運行,需要主動調用 start() 方法來啟動線程,join() 會讓調用它的線程被阻塞直到執行結束。(PS:可通過調用 t.setDaemon(True) 使其為後臺線程避免主線程被阻塞)

線程定位

示例代碼如下所示

import threading
import time


def first_function():
    print("{0} is starting".format(threading.currentThread().getName()))
    time.sleep(2)
    print("{0} is Exiting".format(threading.currentThread().getName()))


def second_function():
    print("{0} is starting".format(threading.currentThread().getName()))
    time.sleep(2)
    print("{0} is Exiting".format(threading.currentThread().getName()))


def third_function():
    print("{0} is starting".format(threading.currentThread().getName()))
    time.sleep(2)
    print("{0} is Exiting".format(threading.currentThread().getName()))

if __name__ == "__main__":
    t1 = threading.Thread(target=first_function,name="first")
    t2 = threading.Thread(target=second_function,name="second")
    t3 = threading.Thread(target=third_function,name="third")

    t1.start()
    t2.start()
    t3.start()
    t1.join()
    t2.join()
    t3.join()

通過設置 threading.Thread() 函數的 name 參數來設置線程名稱,通過 threading.currentThread().getName() 來獲取當前線程名稱;線程的預設名稱會以 Thread-{i} 格式來定義

自定義一個線程對象

示例代碼如下所示

import threading
import time

exitFlag = 0


class myThread(threading.Thread):
    def __init__(self, threadID, name, counter):
        threading.Thread.__init__(self)
        self.threadID = threadID
        self.name = name
        self.counter = counter

    def run(self):
        print("Starting:{0}".format(self.name))
        print_time(self.name, self.counter, 5)
        print("Exiting:{0}".format(self.name))


def print_time(threadName, delay, counter):
    while counter:
        if exitFlag:
            thread.exit()
        time.sleep(delay)
        print("{0} {1}".format(threadName, time.ctime(time.time())))
        counter -= 1


t1 = myThread(1, "Thread-1", 1)
t2 = myThread(2, "Thread-2", 1)

t1.start()
t2.start()

t1.join()
t2.join()

print("Exiting Main Thread.")

如果想自定義一個線程對象,首先就是要定義一個繼承 threading.Thread 類的子類,實現構造函數, 並重寫 run() 方法即可。

線程同步

Lock

示例代碼如下所示

import threading

shared_resource_with_lock = 0
shared_resource_with_no_lock = 0
COUNT = 100000
shared_resource_lock = threading.Lock()


def increment_with_lock():
    global shared_resource_with_lock
    for i in range(COUNT):
        shared_resource_lock.acquire()
        shared_resource_with_lock += 1
        shared_resource_lock.release()


def decrement_with_lock():
    global shared_resource_with_lock
    for i in range(COUNT):
        shared_resource_lock.acquire()
        shared_resource_with_lock -= 1
        shared_resource_lock.release()


def increment_without_lock():
    global shared_resource_with_no_lock
    for i in range(COUNT):
        shared_resource_with_no_lock += 1


def decrement_wthout_lock():
    global shared_resource_with_no_lock
    for i in range(COUNT):
        shared_resource_with_no_lock -= 1


if __name__ == "__main__":
    t1 = threading.Thread(target=increment_with_lock)
    t2 = threading.Thread(target=decrement_with_lock)
    t3 = threading.Thread(target=increment_without_lock)
    t4 = threading.Thread(target=decrement_wthout_lock)
    t1.start()
    t2.start()
    t3.start()
    t4.start()
    t1.join()
    t2.join()
    t3.join()
    t4.join()
    print("the value of shared variable with lock management is :{0}".format(
        shared_resource_with_lock))
    print("the value of shared variable with race condition is :{0}".format(
        shared_resource_with_no_lock))

通過 threading.Lock() 方法我們可以拿到線程鎖,一般有兩種操作方式:acquire()release() 在兩者之間是加鎖狀態,如果釋放失敗的話會顯示 RuntimError() 的異常。

RLock

RLock 也叫遞歸鎖,和 Lock 的區別在於:誰拿到誰釋放,是通過 threading.RLock() 來拿到的;

示例代碼如下所示

import threading
import time


class Box(object):
    lock = threading.RLock()

    def __init__(self):
        self.total_items = 0

    def execute(self, n):
        Box.lock.acquire()
        self.total_items += n
        Box.lock.release()

    def add(self):
        Box.lock.acquire()
        self.execute(1)
        Box.lock.release()

    def remove(self):
        Box.lock.acquire()
        self.execute(-1)
        Box.lock.release()


def adder(box, items):
    while items > 0:
        print("adding 1 item in the box")
        box.add()
        time.sleep(1)
        items -= 1


def remover(box, items):
    while items > 0:
        print("removing 1 item in the box")
        box.remove()
        time.sleep(1)
        items -= 1


if __name__ == "__main__":
    items = 5
    print("putting {0} items in the box".format(items))
    box = Box()
    t1 = threading.Thread(target=adder, args=(box, items))
    t2 = threading.Thread(target=remover, args=(box, items))

    t1.start()
    t2.start()

    t1.join()
    t2.join()
    print("{0} items still remain in the box".format(box.total_items))

信號量

示例代碼如下所示

import threading
import time
import random

semaphore = threading.Semaphore(0)


def consumer():
    print("Consumer is waiting.")
    semaphore.acquire()
    print("Consumer notify:consumed item numbers {0}".format(item))


def producer():
    global item
    time.sleep(10)
    item = random.randint(0, 10000)
    print("producer notify:produced item number {0}".format(item))
    semaphore.release()


if __name__ == "__main__":
    for i in range(0, 5):
        t1 = threading.Thread(target=producer)
        t2 = threading.Thread(target=consumer)
        t1.start()
        t2.start()
        t1.join()
        t2.join()

    print("program terminated.")

信號量初始化為 0 ,然後在兩個並行線程中,通過調用 semaphore.acquire() 函數會阻塞消費者線程,直到 semaphore.release() 在生產者中被調用,這裡模擬了生產者-消費者 模式來進行了測試;如果信號量的計數器到了0,就會阻塞 acquire() 方法,直到得到另一個線程的通知。如果信號量的計數器大於0,就會對這個值-1然後分配資源。

使用條件進行線程同步

解釋條件機制最好的例子還是生產者-消費者問題。在本例中,只要緩存不滿,生產者一直向緩存生產;只要緩存不空,消費者一直從緩存取出(之後銷毀)。當緩衝隊列不為空的時候,生產者將通知消費者;當緩衝隊列不滿的時候,消費者將通知生產者。

示例代碼如下所示

from threading import Thread, Condition
import time

items = []
condition = Condition()


class consumer(Thread):
    def __init__(self):
        Thread.__init__(self)

    def consume(self):
        global condition
        global items
        condition.acquire()
        if len(items) == 0:
            condition.wait()
            print("Consumer notify:no item to consum")
        items.pop()
        print("Consumer notify: consumed 1 item")
        print("Consumer notify: item to consume are:{0}".format(len(items)))

        condition.notify()
        condition.release()

    def run(self):
        for i in range(0, 20):
            time.sleep(2)
            self.consume()


class producer(Thread):
    def __init__(self):
        Thread.__init__(self)

    def produce(self):
        global condition
        global items
        condition.acquire()
        if len(items) == 10:
            condition.wait()
            print("Producer notify:items producted are:{0}".format(len(items)))
            print("Producer notify:stop the production!!")
        items.append(1)
        print("Producer notify:total items producted:{0}".format(len(items)))
        condition.notify()
        condition.release()

    def run(self):
        for i in range(0, 20):
            time.sleep(1)
            self.produce()


if __name__ == "__main__":
    producer = producer()
    consumer = consumer()
    producer.start()
    consumer.start()
    producer.join()
    consumer.join()

通過 condition.acquire() 來獲取鎖對象,condition.wait() 會使當前線程進入阻塞狀態,直到收到 condition.notify() 信號,同時,調用信號的通知的對象也要及時調用 condition.release() 來釋放資源;

使用事件進行線程同步

事件是線程之間用於通信的對。有的線程等待信號,有的線程發出信號。

示例代碼如下所示

import time
from threading import Thread, Event
import random

items = []
event = Event()


class consumer(Thread):
    def __init__(self, items, event):
        Thread.__init__(self)
        self.items = items
        self.event = event

    def run(self):
        while True:
            time.sleep(2)
            self.event.wait()
            item = self.items.pop()
            print('Consumer notify:{0} popped from list by {1}'.format(
                item, self.name))


class producer(Thread):
    def __init__(self, integers, event):
        Thread.__init__(self)
        self.items = items
        self.event = event

    def run(self):
        global item
        for i in range(100):
            time.sleep(2)
            item = random.randint(0, 256)
            self.items.append(item)
            print('Producer notify: item  N° %d appended to list by %s' %
                  (item, self.name))
            print('Producer notify: event set by %s' % self.name)
            self.event.set()
            print('Produce notify: event cleared by %s ' % self.name)
            self.event.clear()


if __name__ == "__main__":
    t1 = producer(items, event)
    t2 = consumer(items, event)
    t1.start()
    t2.start()
    t1.join()
    t2.join()

使用 with 語法簡化代碼

import threading
import logging

logging.basicConfig(level=logging.DEBUG,
                    format='(%(threadName)-10s) %(message)s')


def threading_with(statement):
    with statement:
        logging.debug("%s acquired via with" % statement)


def Threading_not_with(statement):
    statement.acquire()
    try:
        logging.debug("%s acquired directly " % statement)
    finally:
        statement.release()


if __name__ == "__main__":
    lock = threading.Lock()
    rlock = threading.RLock()
    condition = threading.Condition()
    mutex = threading.Semaphore(1)
    threading_synchronization_list = [lock, rlock, condition, mutex]

    for statement in threading_synchronization_list:
        t1 = threading.Thread(target=threading_with, args=(statement,))
        t2 = threading.Thread(target=Threading_not_with, args=(statement,))
        t1.start()
        t2.start()
        t1.join()
        t2.join()

使用 queue 進行線程通信

Queue 常用的方法有以下四個:

  • put():往 queue 中添加一個元素
  • get():從 queue 中刪除一個元素,並返回該元素
  • task_done():每次元素被處理的時候都需要調用這個方法
  • join():所有元素都被處理之前一直阻塞
from threading import Thread, Event
from queue import Queue
import time
import random


class producer(Thread):
    def __init__(self, queue):
        Thread.__init__(self)
        self.queue = queue

    def run(self):
        for i in range(10):
            item = random.randint(0, 256)
            self.queue.put(item)
            print("Producer notify: item item N° %d appended to queue by %s" %
                  (item, self.name))
            time.sleep(1)


class consumer(Thread):
    def __init__(self, queue):
        Thread.__init__(self)
        self.queue = queue

    def run(self):
        while True:
            item = self.queue.get()
            print('Consumer notify : %d popped from queue by %s' %
                  (item, self.name))
            self.queue.task_done()


if __name__ == "__main__":
    queue = Queue()
    t1 = producer(queue)
    t2 = consumer(queue)
    t3 = consumer(queue)
    t4 = consumer(queue)
    t1.start()
    t2.start()
    t3.start()
    t4.start()
    t1.join()
    t2.join()
    t3.join()
    t4.join()

基於進程的並行

multiprocessing 是 Python 標準庫中的模塊,實現了共用記憶體機制。

非同步編程

使用 concurrent.futures 模塊

該模塊具有線程池和進程池,管理並行編程任務、處理非確定性的執行流程、進程/線程同步等功能;此模塊由以下部分組成

  • concurrent.futures.Executor: 這是一個虛擬基類,提供了非同步執行的方法。
  • submit(function, argument): 調度函數(可調用的對象)的執行,將 argument 作為參數傳入。
  • map(function, argument): 將 argument 作為參數執行函數,以 非同步 的方式。
  • shutdown(Wait=True): 發出讓執行者釋放所有資源的信號。
  • concurrent.futures.Future: 其中包括函數的非同步執行。Future對象是submit任務(即帶有參數的functions)到executor的實例。

示例代碼如下所示

import concurrent.futures
import time

number_list = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]


def evaluate_item(x):
    result_item = count(x)
    return result_item


def count(number):
    for i in range(0, 1000000):
        i = i + 1
    return i * number


if __name__ == "__main__":
    # 順序執行
    start_time = time.time()
    for item in number_list:
        print(evaluate_item(item))
    print("Sequential execution in " + str(time.time() - start_time), "seconds")
    # 線程池執行
    start_time_1 = time.time()
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        futures = [executor.submit(evaluate_item, item)
                   for item in number_list]
        for future in concurrent.futures.as_completed(futures):
            print(future.result())
    print("Thread pool execution in " +
          str(time.time() - start_time_1), "seconds")
    # 線程池執行
    start_time_2 = time.time()
    with concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor:
        futures = [executor.submit(evaluate_item, item)
                   for item in number_list]
        for future in concurrent.futures.as_completed(futures):
            print(future.result())
    print("Process pool execution in " +
          str(time.time() - start_time_2), "seconds")

使用 Asyncio 管理事件迴圈

Python 的 Asyncio 模塊提供了管理事件、協程、任務和線程的方法,以及編寫併發代碼的原語。此模塊的主要組件和概念包括:

  • 事件迴圈: 在Asyncio模塊中,每一個進程都有一個事件迴圈。
  • 協程: 這是子程式的泛化概念。協程可以在執行期間暫停,這樣就可以等待外部的處理(例如IO)完成之後,從之前暫停的地方恢復執行。
  • Futures: 定義了 Future 對象,和 concurrent.futures 模塊一樣,表示尚未完成的計算。
  • Tasks: 這是Asyncio的子類,用於封裝和管理並行模式下的協程。

Asyncio 提供了以下方法來管理事件迴圈:

  • loop = get_event_loop(): 得到當前上下文的事件迴圈。
  • loop.call_later(time_delay, callback, argument): 延後 time_delay 秒再執行 callback 方法。
  • loop.call_soon(callback, argument): 儘可能快調用 callback, call_soon() 函數結束,主線程回到事件迴圈之後就會馬上調用 callback 。
  • loop.time(): 以float類型返回當前時間迴圈的內部時間。
  • asyncio.set_event_loop(): 為當前上下文設置事件迴圈。
  • asyncio.new_event_loop(): 根據此策略創建一個新的時間迴圈並返回。
  • loop.run_forever(): 在調用 stop() 之前將一直運行。

示例代碼如下所示

import asyncio
import datetime
import time


def fuction_1(end_time, loop):
    print("function_1 called")
    if(loop.time() + 1.0) < end_time:
        loop.call_later(1, fuction_2, end_time, loop)
    else:
        loop.stop()


def fuction_2(end_time, loop):
    print("function_2 called")
    if(loop.time() + 1.0) < end_time:
        loop.call_later(1, function_3, end_time, loop)
    else:
        loop.stop()


def function_3(end_time, loop):
    print("function_3 called")
    if(loop.time() + 1.0) < end_time:
        loop.call_later(1, fuction_1, end_time, loop)
    else:
        loop.stop()


def function_4(end_time, loop):
    print("function_4 called")
    if(loop.time() + 1.0) < end_time:
        loop.call_later(1, function_4, end_time, loop)
    else:
        loop.stop()


loop = asyncio.get_event_loop()

end_loop = loop.time() + 9.0
loop.call_soon(fuction_1, end_loop, loop)
loop.run_forever()
loop.close()

使用 Asyncio 管理協程

示例代碼如下所示

import asyncio
import time
from random import randint


@asyncio.coroutine
def StartState():
    print("Start State called \n")
    input_val = randint(0, 1)
    time.sleep(1)
    if input_val == 0:
        result = yield from State2(input_val)
    else:
        result = yield from State1(input_val)
    print("Resume of the Transition:\nStart State calling" + result)


@asyncio.coroutine
def State1(transition_value):
    outputVal = str("State 1 with transition value=%s \n" % (transition_value))
    input_val = randint(0, 1)
    time.sleep(1)
    print("...Evaluating...")
    if input_val == 0:
        result = yield from State3(input_val)
    else:
        result = yield from State2(input_val)


@asyncio.coroutine
def State2(transition_value):
    outputVal = str("State 2 with transition value= %s \n" %
                    (transition_value))
    input_Val = randint(0, 1)
    time.sleep(1)
    print("...Evaluating...")
    if (input_Val == 0):
        result = yield from State1(input_Val)
    else:
        result = yield from State3(input_Val)
    result = "State 2 calling " + result
    return outputVal + str(result)


@asyncio.coroutine
def State3(transition_value):
    outputVal = str("State 3 with transition value = %s \n" %
                    (transition_value))
    input_val = randint(0, 1)
    time.sleep(1)
    print("...Evaluating...")
    if(input_val == 0):
        result = yield from State1(input_val)
    else:
        result = yield from State2(input_val)
    result = "State 3 calling " + result
    return outputVal + str(result)


@asyncio.coroutine
def EndState(transition_value):
    outputVal = str("End State With transition value = %s \n" %
                    (transition_value))
    print("...Stop Computation...")
    return outputVal


if __name__ == "__main__":
    print("Finites State Machine simulation with Asyncio Coroutine")
    loop = asyncio.get_event_loop()
    loop.run_until_complete(StartState())

使用 Asyncio 控制任務

示例代碼如下所示

import asyncio


@asyncio.coroutine
def factorial(number):
    f = 1
    for i in range(2, number + 1):
        print("Asyncio.Task:Compute factorial(%s)" % (i))
        yield from asyncio.sleep(1)
        f *= i
    print("Asyncio.Task - factorial(%s) = %s" % (number, f))


@asyncio.coroutine
def fibonacci(number):
    a, b = 0, 1
    for i in range(number):
        print("Asyncio.Task:Complete fibonacci (%s)" % (i))
        yield from asyncio.sleep(1)
        a, b = b, a+b
    print("Asyncio.Task - fibonaci (%s)= %s" % (number, a))


@asyncio.coroutine
def binomialCoeff(n, k):
    result = 1
    for i in range(1, k+1):
        result = result * (n-i+1) / i
        print("Asyncio.Task:Compute binomialCoeff (%s)" % (i))
        yield from asyncio.sleep(1)
    print("Asyncio.Task - binomialCoeff (%s,%s) = %s" % (n, k, result))


if __name__ == "__main__":
    tasks = [asyncio.Task(factorial(10)), asyncio.Task(
        fibonacci(10)), asyncio.Task(binomialCoeff(20, 10))]
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(tasks))
    loop.close()

使用Asyncio和Futures

示例代碼如下所示

import asyncio
import sys


@asyncio.coroutine
def first_coroutine(future, N):
    count = 0
    for i in range(1, N + 1):
        count = count + i
    yield from asyncio.sleep(4)
    future.set_result(
        "first coroutine (sum of N integers) result = " + str(count))


@asyncio.coroutine
def second_coroutine(future, N):
    count = 1
    for i in range(2, N + 1):
        count *= i
    yield from asyncio.sleep(3)
    future.set_result("second coroutine (factorial) result = " + str(count))


def got_result(future):
    print(future.result())


if __name__ == "__main__":
    N1 = 1
    N2 = 1
    loop = asyncio.get_event_loop()
    future1 = asyncio.Future()
    future2 = asyncio.Future()
    tasks = [
        first_coroutine(future1, N1),
        second_coroutine(future2, N2)
    ]
    future1.add_done_callback(got_result)
    future2.add_done_callback(got_result)
    loop.run_until_complete(asyncio.wait(tasks))
    loop.close()

分散式編程

GPU 編程

相關參考


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • PHP+Nginx環境搭建 作者:王宇陽( Mirror )^_^ 參考文章: ​ "Nginx+PHP+MySQL安裝參考" ​ "PHP源碼安裝經驗" ​ "PHP源碼環境搭建過程中常見問題" CentOS環境 配置CentOS 7網路: CentOS(最小安裝)預設是不打開網路的 啟動網路 v ...
  • 1、在GitHub上建一個新倉庫 ​ 2、配置Git的SSH KEY 生成SSH添加到GitHub 回到你的git bash中, git config global user.name "yourname" git config global user.email "youremail" 這裡的yo ...
  • 多進程 python中創建進程模塊為: 進程之間數據不是共用的 註意:windowns下進程運行會報錯,linux mac 不會出現此問題。解決方法: 將進程啟動代碼放到main里即可執行,示例代碼: 進程常用方法: :和線程一樣,預設會等子進程執行完畢後,代碼才會繼續往下執行 : True 為 主 ...
  • 1、安裝 Git 和 nodejs "https://hexo.io/zh cn/docs/" ​ 2 安裝Hexo "https://hexo.io/zh cn/" ​ 3、更換伊卡洛斯主題 "https://github.com/ppoffice/hexo theme icarus" ​ 配置新 ...
  • 一、寫在前面 說道程式員,你會想到什麼呢?有人認為程式員象徵著高薪,有人認為程式員都是死肥宅,還有人想到的則是996和 ICU。 別人眼中的程式員:飛快的敲擊鍵盤、酷炫的切換屏幕、各種看不懂的字元代碼。 然而現實中的程式員呢?對於很多程式員來說,沒有百度和 Google 解決不了的問題,也沒有 ct ...
  • 三、在碼雲平臺創建項目 git服務平臺: 主要使用github(最主流) 國內訪問速度慢 托管私有項目收費 國內一般使用碼雲gitee 國內訪問速度快 托管私有項目免費(限制開發人數) 公司中使用gitlab或者svn來搭建 主要使用github(最主流) 國內訪問速度慢 托管私有項目收費 國內訪問 ...
  • maven是一個基於java平臺的自動化構建工具。構建工具的發展由make->ant->maven->gradle其中gradle還在發展中,使用較少,學習難度比較大,所以目前占據主流的還是maven。 maven的作用:①,幫助我們管理jar包:i,增加第三方jar包。先在本地倉庫中下載,如果本地 ...
  • 大家或許知道,Python 為了提高記憶體的利用效率,採用了一套共用對象記憶體的分配策略。 例如,對於那些數值較小的數字對象([ 5, 256])、布爾值對象、None 對象、較短的字元串對象( 通常 是 20)等等,字面量相等的對象實際上是同一個對象。 我很早的時候曾寫過一篇《 "Python中的“特 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...