進程和線程 進程是程式運行的實例。一個進程裡面可以包含多個線程,因此同一進程下的多個線程之間可以共用線程內的所有資源,它是操作系統動態運行的基本單元;每一個線程是進程下的一個實例,可以動態調度和獨立運行,由於線程和進程有很多類似的特點,因此,線程又被稱為輕量級的進程。線程的運行在進程之下,進程的存在 ...
進程和線程
進程是程式運行的實例。一個進程裡面可以包含多個線程,因此同一進程下的多個線程之間可以共用線程內的所有資源,它是操作系統動態運行的基本單元;每一個線程是進程下的一個實例,可以動態調度和獨立運行,由於線程和進程有很多類似的特點,因此,線程又被稱為輕量級的進程。線程的運行在進程之下,進程的存在依賴於線程;
開胃菜
基於 Python3 創建一個簡單的進程示例
from threading import Thread
from time import sleep
class CookBook(Thread):
def __init__(self):
Thread.__init__(self)
self.message = "Hello Parallel Python CookBook!!\n"
def print_message(self):
print(self.message)
def run(self):
print("Thread Starting\n")
x = 0
while x < 10:
self.print_message()
sleep(2)
x += 1
print("Thread Ended!\n")
print("Process Started")
hello_python = CookBook()
hello_python.start()
print("Process Ended")
需要註意的是,永遠不要讓線程在後臺默默執行,當其執行完畢後要及時釋放資源。
基於線程的並行
多線程編程一般使用共用記憶體空間進行線程間的通信,這就使管理記憶體空間成為多線程編程的關鍵。Python 通過標準庫 threading 模塊來管理線程,具有以下的組件:
- 線程對象
- Lock 對象
- RLock 對象
- 信號對象
- 條件對象
- 事件對象
定義一個線程
基本語法
示例代碼如下所示
import threading
def function(i):
print("function called by thread: {0}".format(i))
return
threads = []
for i in range(5):
t = threading.Thread(target=function, args=(i,))
threads.append(t)
t.start()
lambda t, threads: t.join()
需要註意的是,線程創建後並不會自動運行,需要主動調用 start() 方法來啟動線程,join() 會讓調用它的線程被阻塞直到執行結束。(PS:可通過調用 t.setDaemon(True) 使其為後臺線程避免主線程被阻塞)
線程定位
示例代碼如下所示
import threading
import time
def first_function():
print("{0} is starting".format(threading.currentThread().getName()))
time.sleep(2)
print("{0} is Exiting".format(threading.currentThread().getName()))
def second_function():
print("{0} is starting".format(threading.currentThread().getName()))
time.sleep(2)
print("{0} is Exiting".format(threading.currentThread().getName()))
def third_function():
print("{0} is starting".format(threading.currentThread().getName()))
time.sleep(2)
print("{0} is Exiting".format(threading.currentThread().getName()))
if __name__ == "__main__":
t1 = threading.Thread(target=first_function,name="first")
t2 = threading.Thread(target=second_function,name="second")
t3 = threading.Thread(target=third_function,name="third")
t1.start()
t2.start()
t3.start()
t1.join()
t2.join()
t3.join()
通過設置 threading.Thread() 函數的 name 參數來設置線程名稱,通過 threading.currentThread().getName() 來獲取當前線程名稱;線程的預設名稱會以 Thread-{i} 格式來定義
自定義一個線程對象
示例代碼如下所示
import threading
import time
exitFlag = 0
class myThread(threading.Thread):
def __init__(self, threadID, name, counter):
threading.Thread.__init__(self)
self.threadID = threadID
self.name = name
self.counter = counter
def run(self):
print("Starting:{0}".format(self.name))
print_time(self.name, self.counter, 5)
print("Exiting:{0}".format(self.name))
def print_time(threadName, delay, counter):
while counter:
if exitFlag:
thread.exit()
time.sleep(delay)
print("{0} {1}".format(threadName, time.ctime(time.time())))
counter -= 1
t1 = myThread(1, "Thread-1", 1)
t2 = myThread(2, "Thread-2", 1)
t1.start()
t2.start()
t1.join()
t2.join()
print("Exiting Main Thread.")
如果想自定義一個線程對象,首先就是要定義一個繼承 threading.Thread 類的子類,實現構造函數, 並重寫 run() 方法即可。
線程同步
Lock
示例代碼如下所示
import threading
shared_resource_with_lock = 0
shared_resource_with_no_lock = 0
COUNT = 100000
shared_resource_lock = threading.Lock()
def increment_with_lock():
global shared_resource_with_lock
for i in range(COUNT):
shared_resource_lock.acquire()
shared_resource_with_lock += 1
shared_resource_lock.release()
def decrement_with_lock():
global shared_resource_with_lock
for i in range(COUNT):
shared_resource_lock.acquire()
shared_resource_with_lock -= 1
shared_resource_lock.release()
def increment_without_lock():
global shared_resource_with_no_lock
for i in range(COUNT):
shared_resource_with_no_lock += 1
def decrement_wthout_lock():
global shared_resource_with_no_lock
for i in range(COUNT):
shared_resource_with_no_lock -= 1
if __name__ == "__main__":
t1 = threading.Thread(target=increment_with_lock)
t2 = threading.Thread(target=decrement_with_lock)
t3 = threading.Thread(target=increment_without_lock)
t4 = threading.Thread(target=decrement_wthout_lock)
t1.start()
t2.start()
t3.start()
t4.start()
t1.join()
t2.join()
t3.join()
t4.join()
print("the value of shared variable with lock management is :{0}".format(
shared_resource_with_lock))
print("the value of shared variable with race condition is :{0}".format(
shared_resource_with_no_lock))
通過 threading.Lock() 方法我們可以拿到線程鎖,一般有兩種操作方式:acquire() 和 release() 在兩者之間是加鎖狀態,如果釋放失敗的話會顯示 RuntimError() 的異常。
RLock
RLock 也叫遞歸鎖,和 Lock 的區別在於:誰拿到誰釋放,是通過 threading.RLock() 來拿到的;
示例代碼如下所示
import threading
import time
class Box(object):
lock = threading.RLock()
def __init__(self):
self.total_items = 0
def execute(self, n):
Box.lock.acquire()
self.total_items += n
Box.lock.release()
def add(self):
Box.lock.acquire()
self.execute(1)
Box.lock.release()
def remove(self):
Box.lock.acquire()
self.execute(-1)
Box.lock.release()
def adder(box, items):
while items > 0:
print("adding 1 item in the box")
box.add()
time.sleep(1)
items -= 1
def remover(box, items):
while items > 0:
print("removing 1 item in the box")
box.remove()
time.sleep(1)
items -= 1
if __name__ == "__main__":
items = 5
print("putting {0} items in the box".format(items))
box = Box()
t1 = threading.Thread(target=adder, args=(box, items))
t2 = threading.Thread(target=remover, args=(box, items))
t1.start()
t2.start()
t1.join()
t2.join()
print("{0} items still remain in the box".format(box.total_items))
信號量
示例代碼如下所示
import threading
import time
import random
semaphore = threading.Semaphore(0)
def consumer():
print("Consumer is waiting.")
semaphore.acquire()
print("Consumer notify:consumed item numbers {0}".format(item))
def producer():
global item
time.sleep(10)
item = random.randint(0, 10000)
print("producer notify:produced item number {0}".format(item))
semaphore.release()
if __name__ == "__main__":
for i in range(0, 5):
t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer)
t1.start()
t2.start()
t1.join()
t2.join()
print("program terminated.")
信號量初始化為 0 ,然後在兩個並行線程中,通過調用 semaphore.acquire() 函數會阻塞消費者線程,直到 semaphore.release() 在生產者中被調用,這裡模擬了生產者-消費者 模式來進行了測試;如果信號量的計數器到了0,就會阻塞 acquire() 方法,直到得到另一個線程的通知。如果信號量的計數器大於0,就會對這個值-1然後分配資源。
使用條件進行線程同步
解釋條件機制最好的例子還是生產者-消費者問題。在本例中,只要緩存不滿,生產者一直向緩存生產;只要緩存不空,消費者一直從緩存取出(之後銷毀)。當緩衝隊列不為空的時候,生產者將通知消費者;當緩衝隊列不滿的時候,消費者將通知生產者。
示例代碼如下所示
from threading import Thread, Condition
import time
items = []
condition = Condition()
class consumer(Thread):
def __init__(self):
Thread.__init__(self)
def consume(self):
global condition
global items
condition.acquire()
if len(items) == 0:
condition.wait()
print("Consumer notify:no item to consum")
items.pop()
print("Consumer notify: consumed 1 item")
print("Consumer notify: item to consume are:{0}".format(len(items)))
condition.notify()
condition.release()
def run(self):
for i in range(0, 20):
time.sleep(2)
self.consume()
class producer(Thread):
def __init__(self):
Thread.__init__(self)
def produce(self):
global condition
global items
condition.acquire()
if len(items) == 10:
condition.wait()
print("Producer notify:items producted are:{0}".format(len(items)))
print("Producer notify:stop the production!!")
items.append(1)
print("Producer notify:total items producted:{0}".format(len(items)))
condition.notify()
condition.release()
def run(self):
for i in range(0, 20):
time.sleep(1)
self.produce()
if __name__ == "__main__":
producer = producer()
consumer = consumer()
producer.start()
consumer.start()
producer.join()
consumer.join()
通過 condition.acquire() 來獲取鎖對象,condition.wait() 會使當前線程進入阻塞狀態,直到收到 condition.notify() 信號,同時,調用信號的通知的對象也要及時調用 condition.release() 來釋放資源;
使用事件進行線程同步
事件是線程之間用於通信的對。有的線程等待信號,有的線程發出信號。
示例代碼如下所示
import time
from threading import Thread, Event
import random
items = []
event = Event()
class consumer(Thread):
def __init__(self, items, event):
Thread.__init__(self)
self.items = items
self.event = event
def run(self):
while True:
time.sleep(2)
self.event.wait()
item = self.items.pop()
print('Consumer notify:{0} popped from list by {1}'.format(
item, self.name))
class producer(Thread):
def __init__(self, integers, event):
Thread.__init__(self)
self.items = items
self.event = event
def run(self):
global item
for i in range(100):
time.sleep(2)
item = random.randint(0, 256)
self.items.append(item)
print('Producer notify: item N° %d appended to list by %s' %
(item, self.name))
print('Producer notify: event set by %s' % self.name)
self.event.set()
print('Produce notify: event cleared by %s ' % self.name)
self.event.clear()
if __name__ == "__main__":
t1 = producer(items, event)
t2 = consumer(items, event)
t1.start()
t2.start()
t1.join()
t2.join()
使用 with 語法簡化代碼
import threading
import logging
logging.basicConfig(level=logging.DEBUG,
format='(%(threadName)-10s) %(message)s')
def threading_with(statement):
with statement:
logging.debug("%s acquired via with" % statement)
def Threading_not_with(statement):
statement.acquire()
try:
logging.debug("%s acquired directly " % statement)
finally:
statement.release()
if __name__ == "__main__":
lock = threading.Lock()
rlock = threading.RLock()
condition = threading.Condition()
mutex = threading.Semaphore(1)
threading_synchronization_list = [lock, rlock, condition, mutex]
for statement in threading_synchronization_list:
t1 = threading.Thread(target=threading_with, args=(statement,))
t2 = threading.Thread(target=Threading_not_with, args=(statement,))
t1.start()
t2.start()
t1.join()
t2.join()
使用 queue 進行線程通信
Queue 常用的方法有以下四個:
- put():往 queue 中添加一個元素
- get():從 queue 中刪除一個元素,並返回該元素
- task_done():每次元素被處理的時候都需要調用這個方法
- join():所有元素都被處理之前一直阻塞
from threading import Thread, Event
from queue import Queue
import time
import random
class producer(Thread):
def __init__(self, queue):
Thread.__init__(self)
self.queue = queue
def run(self):
for i in range(10):
item = random.randint(0, 256)
self.queue.put(item)
print("Producer notify: item item N° %d appended to queue by %s" %
(item, self.name))
time.sleep(1)
class consumer(Thread):
def __init__(self, queue):
Thread.__init__(self)
self.queue = queue
def run(self):
while True:
item = self.queue.get()
print('Consumer notify : %d popped from queue by %s' %
(item, self.name))
self.queue.task_done()
if __name__ == "__main__":
queue = Queue()
t1 = producer(queue)
t2 = consumer(queue)
t3 = consumer(queue)
t4 = consumer(queue)
t1.start()
t2.start()
t3.start()
t4.start()
t1.join()
t2.join()
t3.join()
t4.join()
基於進程的並行
multiprocessing 是 Python 標準庫中的模塊,實現了共用記憶體機制。
非同步編程
使用 concurrent.futures 模塊
該模塊具有線程池和進程池,管理並行編程任務、處理非確定性的執行流程、進程/線程同步等功能;此模塊由以下部分組成
- concurrent.futures.Executor: 這是一個虛擬基類,提供了非同步執行的方法。
- submit(function, argument): 調度函數(可調用的對象)的執行,將 argument 作為參數傳入。
- map(function, argument): 將 argument 作為參數執行函數,以 非同步 的方式。
- shutdown(Wait=True): 發出讓執行者釋放所有資源的信號。
- concurrent.futures.Future: 其中包括函數的非同步執行。Future對象是submit任務(即帶有參數的functions)到executor的實例。
示例代碼如下所示
import concurrent.futures
import time
number_list = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
def evaluate_item(x):
result_item = count(x)
return result_item
def count(number):
for i in range(0, 1000000):
i = i + 1
return i * number
if __name__ == "__main__":
# 順序執行
start_time = time.time()
for item in number_list:
print(evaluate_item(item))
print("Sequential execution in " + str(time.time() - start_time), "seconds")
# 線程池執行
start_time_1 = time.time()
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
futures = [executor.submit(evaluate_item, item)
for item in number_list]
for future in concurrent.futures.as_completed(futures):
print(future.result())
print("Thread pool execution in " +
str(time.time() - start_time_1), "seconds")
# 線程池執行
start_time_2 = time.time()
with concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor:
futures = [executor.submit(evaluate_item, item)
for item in number_list]
for future in concurrent.futures.as_completed(futures):
print(future.result())
print("Process pool execution in " +
str(time.time() - start_time_2), "seconds")
使用 Asyncio 管理事件迴圈
Python 的 Asyncio 模塊提供了管理事件、協程、任務和線程的方法,以及編寫併發代碼的原語。此模塊的主要組件和概念包括:
- 事件迴圈: 在Asyncio模塊中,每一個進程都有一個事件迴圈。
- 協程: 這是子程式的泛化概念。協程可以在執行期間暫停,這樣就可以等待外部的處理(例如IO)完成之後,從之前暫停的地方恢復執行。
- Futures: 定義了 Future 對象,和 concurrent.futures 模塊一樣,表示尚未完成的計算。
- Tasks: 這是Asyncio的子類,用於封裝和管理並行模式下的協程。
Asyncio 提供了以下方法來管理事件迴圈:
- loop = get_event_loop(): 得到當前上下文的事件迴圈。
- loop.call_later(time_delay, callback, argument): 延後 time_delay 秒再執行 callback 方法。
- loop.call_soon(callback, argument): 儘可能快調用 callback, call_soon() 函數結束,主線程回到事件迴圈之後就會馬上調用 callback 。
- loop.time(): 以float類型返回當前時間迴圈的內部時間。
- asyncio.set_event_loop(): 為當前上下文設置事件迴圈。
- asyncio.new_event_loop(): 根據此策略創建一個新的時間迴圈並返回。
- loop.run_forever(): 在調用 stop() 之前將一直運行。
示例代碼如下所示
import asyncio
import datetime
import time
def fuction_1(end_time, loop):
print("function_1 called")
if(loop.time() + 1.0) < end_time:
loop.call_later(1, fuction_2, end_time, loop)
else:
loop.stop()
def fuction_2(end_time, loop):
print("function_2 called")
if(loop.time() + 1.0) < end_time:
loop.call_later(1, function_3, end_time, loop)
else:
loop.stop()
def function_3(end_time, loop):
print("function_3 called")
if(loop.time() + 1.0) < end_time:
loop.call_later(1, fuction_1, end_time, loop)
else:
loop.stop()
def function_4(end_time, loop):
print("function_4 called")
if(loop.time() + 1.0) < end_time:
loop.call_later(1, function_4, end_time, loop)
else:
loop.stop()
loop = asyncio.get_event_loop()
end_loop = loop.time() + 9.0
loop.call_soon(fuction_1, end_loop, loop)
loop.run_forever()
loop.close()
使用 Asyncio 管理協程
示例代碼如下所示
import asyncio
import time
from random import randint
@asyncio.coroutine
def StartState():
print("Start State called \n")
input_val = randint(0, 1)
time.sleep(1)
if input_val == 0:
result = yield from State2(input_val)
else:
result = yield from State1(input_val)
print("Resume of the Transition:\nStart State calling" + result)
@asyncio.coroutine
def State1(transition_value):
outputVal = str("State 1 with transition value=%s \n" % (transition_value))
input_val = randint(0, 1)
time.sleep(1)
print("...Evaluating...")
if input_val == 0:
result = yield from State3(input_val)
else:
result = yield from State2(input_val)
@asyncio.coroutine
def State2(transition_value):
outputVal = str("State 2 with transition value= %s \n" %
(transition_value))
input_Val = randint(0, 1)
time.sleep(1)
print("...Evaluating...")
if (input_Val == 0):
result = yield from State1(input_Val)
else:
result = yield from State3(input_Val)
result = "State 2 calling " + result
return outputVal + str(result)
@asyncio.coroutine
def State3(transition_value):
outputVal = str("State 3 with transition value = %s \n" %
(transition_value))
input_val = randint(0, 1)
time.sleep(1)
print("...Evaluating...")
if(input_val == 0):
result = yield from State1(input_val)
else:
result = yield from State2(input_val)
result = "State 3 calling " + result
return outputVal + str(result)
@asyncio.coroutine
def EndState(transition_value):
outputVal = str("End State With transition value = %s \n" %
(transition_value))
print("...Stop Computation...")
return outputVal
if __name__ == "__main__":
print("Finites State Machine simulation with Asyncio Coroutine")
loop = asyncio.get_event_loop()
loop.run_until_complete(StartState())
使用 Asyncio 控制任務
示例代碼如下所示
import asyncio
@asyncio.coroutine
def factorial(number):
f = 1
for i in range(2, number + 1):
print("Asyncio.Task:Compute factorial(%s)" % (i))
yield from asyncio.sleep(1)
f *= i
print("Asyncio.Task - factorial(%s) = %s" % (number, f))
@asyncio.coroutine
def fibonacci(number):
a, b = 0, 1
for i in range(number):
print("Asyncio.Task:Complete fibonacci (%s)" % (i))
yield from asyncio.sleep(1)
a, b = b, a+b
print("Asyncio.Task - fibonaci (%s)= %s" % (number, a))
@asyncio.coroutine
def binomialCoeff(n, k):
result = 1
for i in range(1, k+1):
result = result * (n-i+1) / i
print("Asyncio.Task:Compute binomialCoeff (%s)" % (i))
yield from asyncio.sleep(1)
print("Asyncio.Task - binomialCoeff (%s,%s) = %s" % (n, k, result))
if __name__ == "__main__":
tasks = [asyncio.Task(factorial(10)), asyncio.Task(
fibonacci(10)), asyncio.Task(binomialCoeff(20, 10))]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
使用Asyncio和Futures
示例代碼如下所示
import asyncio
import sys
@asyncio.coroutine
def first_coroutine(future, N):
count = 0
for i in range(1, N + 1):
count = count + i
yield from asyncio.sleep(4)
future.set_result(
"first coroutine (sum of N integers) result = " + str(count))
@asyncio.coroutine
def second_coroutine(future, N):
count = 1
for i in range(2, N + 1):
count *= i
yield from asyncio.sleep(3)
future.set_result("second coroutine (factorial) result = " + str(count))
def got_result(future):
print(future.result())
if __name__ == "__main__":
N1 = 1
N2 = 1
loop = asyncio.get_event_loop()
future1 = asyncio.Future()
future2 = asyncio.Future()
tasks = [
first_coroutine(future1, N1),
second_coroutine(future2, N2)
]
future1.add_done_callback(got_result)
future2.add_done_callback(got_result)
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
分散式編程
略
GPU 編程
略