給自己的Python小筆記-Python-想同時執行多個Function函數嗎? 那來試試這個平行設計模組- 多執行緒(Multi-Threading)使用教學
1. 多執行緒(Multi- Threading),或稱多線程是什麼?
- 執行緒(thread)為作業系統(OS)排程的最小單位
- 多執行緒是指當我們要在同一個Python檔裡執行多組的程式(ex. 不同function),而且是要同時平行執行時
2. 為什麼要使用多執行緒(Multi-Threading)?
現代的電腦幾乎都具有相當多的核心,而我們執行程式,通常只會使用到一顆核心,但我們想要充分的發揮多核心硬體的 運算能力,就需要使用多執行緒(Multi-Threading)或多行程 Mult-Processing)等的平行運算技術
3. 為什麼要使用Python來實現多執行緒(Multi-Threading),並不會更快?
依照多執行緒(Mult. Threading)的平行設計,我們的程式不是應該運行的更快嗎,為什麼使用Python撰寫時, 不僅沒有更快,反而有時候更慢,其他語言在執行多執行續(Multi- Threading),是支援多核CPU同時執行多個執行緒的 ,但Python受到GIL(Global Interpreter Lock)的限制,導致無論在單核還是多核都是只支援一次只執行一個執行緒
GIL(Global Interpreter Lock) 原本是設計來保護數據的,一個執行緒要執行要先拿到GIL,才能進入到 CPU之中執行,反之沒有拿到GIL,就不能進入
4. 這樣Python還要用多執行緒(Multi-Threading)嗎?
GIL(Global interpreter Lock)只會影響那些需要很大程度依賴CPU的程式,像是要做大量計算的數據預處理算法,而 如果我們的程式只會影響到I/O操作,像是網路應用與手機控制等,就非常適合使用多執行緒(Multi-Threading), 因為它本來就需要花很多時間等待。
5. Python的解決辦法
改用多行程(Multi-Processing)的方式來執行,就不會受到GIL (Global Interpreter Lock)限制,我會在之後的文章中寫一篇來教大家喔
實作
6. 獲取執行緒(thread)的資訊方法
- threading.active_count(): 當前活動中的執行續數量,也可以寫成threading.activeCount()
- threading.current_thread: 當前正在使用的執行續,或寫成threading.currentThread()
- threading.enumerate(): 當前活動中的所有執行續資訊
程式範例
## 導入套件 import threading def threading_example(): ## 也可以寫成threading.activeCount()、threading.currentThread() print('活動中的執行續數量: ', threading.active_count()) print('當前正在使用的執行續: ', threading.current_thread()) print('當前正在使用的執行續名稱: ', threading.current_thread().name) print('目前活動中的執行續資訊: ', threading.enumerate) if __name__ == '__main__': threading_example()
執行結果
活動中的執行續數量: 5 當前正在使用的執行續: <_MainThread(MainThread, started 14512)> 當前正在使用的執行續名稱: MainThread 目前活動中的執行續資訊: <function enumerate at 0x000002A0E9831B88>
7. 創建執行續(thread)的方法
- 函數格式
threading.Thread(target = function, name = '執行敘明稱', args = variable)
- 參數說明
- target: 指定執行的函式(工作)
- name: 設定執行緒的名稱
- args: 欲帶入函式的參數,但要以list的形式傳入
- 程式碼範例1:
## 導入套件 import threading ## 新建的執行緒將執行此函數(工作) def added_thread_job(): print('新增加的執行續: ', threading.current_thread()) print('新增加的執行續名稱: ', threading.current_thread().name) print('活動中的執行續數量: ', threading.active_count()) ## 創建新執行緒 def added_threading_example(): ## 新稱的執行緒 added_thread = threading.Thread(target = added_thread_job, name = 'new_added_thread') ## 啟動執行緒 added_thread.start() if __name__ == '__main__': added_threading_example()
執行結果
新增加的執行續: <Thread(new_added_thread, started 17184)> 新增加的執行續名稱: new_added_thread 活動中的執行續數量: 6
- 程式碼範例2: 帶入參數的用法
import threading def added_thread_job(a): ## 印出傳入的參數 print(a) print('新增加的執行續: ', threading.current_thread()) print('新增加的執行續名稱: ', threading.current_thread().name) print('活動中的執行續數量: ', threading.active_count()) def added_threading_example(): ## 欲傳入added_thread_job的參數 text = ['Threading Learning'] ## 新稱執行緒 added_thread = threading.Thread(target = added_thread_job, name = 'new_added_thread', args = text) ## 啟動執行緒 added_thread.start() if __name__ == '__main__': added_threading_example()
執行結果
Threading Learning 新增加的執行續: <Thread(new_added_thread, started 11884)> 新增加的執行續名稱: new_added_thread 活動中的執行續數量: 6
8. 執行創建的執行緒(thread)方法
- 函式功能介紹:
- start(): 啟動執行緒,執行工作
- join(): 等到執行緒終止後, 才會往下執行程式碼
- isAlive(): 檢查執行緒是否還在執行
- getName(): 取得thread名稱
- setName(): 設定thread名稱
1. 基本用法
- 範例: .start()、isAlive()、setName()、getName()用法
提醒: 每次執行結果不同,是因為每次執行緒的執行時間可能不同,所以會有先後執行的問題
import threading def added_thread_job(): print('新增加的執行續: ', threading.current_thread()) print('新增加的執行續名稱: ', threading.current_thread().name) print('活動中的執行續數量: ', threading.active_count()) def added_threading_example(): ## 新增執行緒 added_thread = threading.Thread(target = added_thread_job, name = 'new_added_thread_1') ## 設定thread名稱 added_thread.setName('new_added_thread_2') ## 取得thread名稱 print(added_thread.getName()) ## 啟動執行緒 added_thread.start() ## 檢查執行緒是否還在執行 print(added_thread.isAlive()) if __name__ == '__main__': added_threading_example()
執行結果
new_added_thread_2 新增加的執行續: <Thread(new_added_thread_2, started 3056)> 新增加的執行續名稱: new_added_thread_2 活動中的執行續數量: True 6
2. join()用法 - 解決執行緒還沒執行完,就先往下執行下一段程式的方法
我會透過下面的例子來帶大家了解join的用法,以及有沒有使用join的差別
- 程式碼範例1: 未使用join的狀況下
重點: 還沒加上.join(),遇到的問題
從下面的執行結果可以看出,執行緒還未執行完added_thread_job()(工作),程式就先執行了print('Next Code'),而我們要的應該事先執行完added_thread_job(),再執行print('Next Code'), 再執行print('Next Code'), 這是因為執行時間所造成的影響
## 導入套件 import threading import time def added_thread_job(): print("Thread Start") ## 執行工作, 工作內容要執行20次,每次要執行0.1秒,來將執行工作時間拉長 for i in range(20): time.sleep(0.1) print('execute job' + str(i)) print('Thread Finish') def added_thread_example(): ## 新建執行緒 added_thread = threading.Thread(target = added_thread_job, name = 'new_added_thread') ## 執行執行緒 added_thread.start() print('Next Code') if __name__ == '__main__': added_thread_example()
執行結果
Thread Start Next Code execute job0 execute job1 execute job2 execute job3 execute job4 execute job5 execute job6 execute job7 execute job8 execute job9 execute job10 execute job11 execute job12 execute job13 execute job14 execute job15 execute job16 execute job17 execute job18 execute job19 Thread Finish
- 程式範例2: 使用join()後的結果
重點:加上join()後的解決結果
執行結果可以看出,程式會先等待我們新增的執行緒執行完後,再執行下一段程式
## 導入套件 import threading import time def added_thread_job(): print("Thread Start") ## 執行工作, 工作內容要執行20次,每次要執行0.1秒,來將執行工作時間拉長 for i in range(20): time.sleep(0.1) print('execute job' + str(i)) print('Thread Finish') def added_thread_example(): ## 新建執行緒 added_thread = threading.Thread(target = added_thread_job, name = 'new_added_thread') ## 執行執行緒 added_thread.start() ## 等到此執行緒執行完 added_thread.join() print('Next Code') if __name__ == '__main__': added_thread_example()
執行結果
Thread Start execute job0 execute job1 execute job2 execute job3 execute job4 execute job5 execute job6 execute job7 execute job8 execute job9 execute job10 execute job11 execute job12 execute job13 execute job14 execute job15 execute job16 execute job17 execute job18 execute job19 Thread Finish Next Code
補充: 兩個執行緒一起執行的狀況 這邊我把第一條執行緒.join放在執行print("Next Code")前執行完畢,由於第二條執行緒執行的工作少於第一條執行緒,所以會在執行完第一條執行緒前執行完畢 實驗:大家可以根據自己的調整.join的位置
- 程式範例:
## 導入套件 import threading import time ## 第一條執行緒執行的工作 def added_thread_job1(): print("Thread 1 Start") ## 執行工作, 工作內容要執行5次,每次要執行0.1秒,來將執行工作時間拉長 for i in range(5): time.sleep(0.1) print('execute job' + str(i)) print('Thread 1 Finish') ## 第二條執行緒執行的工作 def added_thread_job2(): print("Thread 2 Start") ## 執行工作, 工作內容要執行2次,每次要執行0.1秒,來將執行工作時間拉長 for i in range(2): time.sleep(0.1) print('execute job' + str(i)) print('Thread 2 Finish') def added_thread_example(): ## 新建執行緒 added_thread1 = threading.Thread(target = added_thread_job1, name = 'new_added_thread1') added_thread2 = threading.Thread(target = added_thread_job2, name = 'new_added_thread2') ## 執行第一條執行緒 added_thread1.start() ## 執行第二條執行緒 added_thread2.start() ## 等到第一條執行緒執行完 added_thread1.join() ## 等到第二條執行緒執行完 added_thread2.join() print('Next Code') if __name__ == '__main__': added_thread_example()
執行結果
Thread 1 Start Thread 2 Start execute job0 execute job0 execute job1 execute job1 Thread 2 Finish execute job2 execute job3 execute job4 Thread 1 Finish Next Code
3. Queue 佇列
- 說明: 用來儲存多執行緒的個別運算結果,最後再從Queue中取得最終結果
重要: 由於多執行緒(或稱多線程),不能像平常那樣使用return來回傳函數(function)結果,所以需要用到Queue的方式來取得函數(function)的結果
- Queue的使用方法
- 首先: 我們需要導入Queue的套件: from queue import Queue
- 再來: q.put()來回傳函數(function)結果到Queue中,與return的意思是一樣的
- 最後: q.get()按順序從序列中取得結果值,一次只取一個值
- Queue的實作步驟範例:
Step1: 導入Queue的套件
from queue import Queue
Step2: 建立執行緒欲執行的函數(工作)
重要提醒: 由於Queue沒有return這個方法,所以這邊需要使用queue.put()來取的回傳值
## 定義執行緒欲直行的函數(工作) def thread_job(l,q): ## 將資料集做平方運算 for i in range(len(l)): l[i] = l[i]**2 ## 回傳值: 回傳到Queue中 q.put(l)
Step3: 定義多執行緒(Multi-Threading)函數
a.
## 定義多執行緒的函數 def multi_threading(): ## 啟用Queue佇列 q = Queue() ## 用來裝所有創建的執行緒 threads = [] ## 自行定義的數據集 data = [[2,4,8],[2,6,10],[3,7,9],[8,9,3],[5,5,6]]
說明: "q = Queue()" 我將q定義為Queue()佇列,表示之後使用q.put(),就會回傳到q這個佇列之中
b.
由於我的資料有五組,所以我定義五個執行緒來運算,並將這些新定義的執行緒(Thread)放入thread串列裡
## 定義五個執行緒,並放入threads裡 for d in range(5): ## 定義執行緒 t = threading.Thread(target = thread_job, args = (data[d], q)) ## 啟用執行緒 t.start() ## 放入threads裡 threads.append(t)
c.
使用.join()將五個執行緒家道主執行緒之中,並確保它們都有執行完畢,才進行下一步
## 使用.join()將五個執行緒家道主執行緒之中,並確保它們執行完畢,才進行下一步 for thread in threads: thread.join()
d.
創建一個串列(list)來裝五個執行緒執行完畢後保存於Queue佇列中的結果,使用q.get()來將它們取出佇列Queue
## 創建一個串列(list)來裝載結果 results = [] ## 將Queue中的結果取回 for r in range(5): results.append(q.get()) print(results)
完整程式碼
## 導入套件 import threading import time ## 導入Queue所需的套件 from queue import Queue ## 定義執行緒欲直行的函數(工作) def thread_job(l,q): ## 將資料集做平方運算 for i in range(len(l)): l[i] = l[i]**2 ## 回傳值: 回傳到Queue中 q.put(l) ## 定義多執行緒的函數 def multi_threading(): ## 啟用Queue佇列 q = Queue() ## 用來裝所有創建的執行緒 threads = [] ## 自行定義的數據集 data = [[2,4,8],[2,6,10],[3,7,9],[8,9,3],[5,5,6]] ## 定義五個執行緒,並放入threads裡 for d in range(5): ## 定義執行緒 t = threading.Thread(target = thread_job, args = (data[d], q)) ## 啟用執行緒 t.start() ## 放入threads裡 threads.append(t) ## 使用.join()將五個執行緒家道主執行緒之中,並確保它們執行完畢,才進行下一步 for thread in threads: thread.join() ## 創建一個串列(list)來裝載結果 results = [] ## 將Queue中的結果取回 for r in range(5): results.append(q.get()) print(results) if __name__ == '__main__': multi_threading()
執行結果
[[4, 16, 64], [4, 36, 100], [9, 49, 81], [64, 81, 9], [25, 25, 36]]
4. GIL(Global Interpreter Lock)
前面有提到受到GIL(Global Interpreter Lock )限制,Python的多執行緒(Multi-Threading)執行速度並沒有比較快,這邊我們來測試一下是否是真的
- 程式碼範例: 我一個採用多執行緒(Multi-Threading),一個不採用,然後比較兩者的執行速度
## 導入套件 import threading import time import copy ## 導入Queue所需的套件 from queue import Queue ## 多執行緒欲直行的函數(工作) def thread_job(l,q): ## 計算數據串列中的總和 total = sum(l) ## 回傳結果 q.put(total) ## 定義多執行緒的函數 def multi_threading(l): ## 啟用Queue佇列 q = Queue() ## 用來裝執行緒 threads = [] ## 定義五個執行緒,並放入threads裡 for d in range(5): ## 定義執行緒 t = threading.Thread(target = thread_job, args = (copy.copy(l), q), name = 'Thread %d' % d) ## 啟用執行緒 t.start() ## 放入threads裡 threads.append(t) ## 使用.join()將這五個執行緒加到主執行緒之中,並確保它們都有執行完畢,才進行下一步 for thread in threads: thread.join() ## 創建一個串列(list)來裝結果 results = [] ## 將Queue中的運算結果取回 for r in range(5): results.append(q.get()) print('Multi Threading Method: ', results[0]) ## 定義不使用Multi-Threading的函數 def normal_method(l): total = sum(l) print('Normal Method: ', total) if __name__ == '__main__': ## 創建一組數據串列 l = list(range(1000000)) ## 記下當下的時間 c_t = time.time() ## 執行一般運算的方法 normal_method(l*5) ## 紀錄執行時間 nt = time.time() - c_t ## 印出一般的執行時間 print('Normal Time: ', nt) ## 記下當下時間 c_t1 = time.time() ## 執行Multi-Threading運算 multi_threading(l) ## 紀錄執行的時間 mt = time.time() - c_t1 ## 印出Multi Threading的執行時間 print('Multi-Threading: ', mt) ## 一般與多執行緒的執行時間差 print('Normal Time - Multi-Threading: ', nt - mt)
為什麼normal_method(l*5)要把數據乘以5說明: 因為我們使用五個執行緒同時執行同一筆數據,就等同於一般狀況下做五次,所以要乘5,才能比較運算速度喔
執行結果
Normal Method: 2499997500000 Normal Time: 0.15358924865722656 Multi Threading Method: 499999500000 Multi-Threading: 0.1486034393310547 Normal Time - Multi-Threading: 0.004985809326171875
結果: 理論來說,我們使用五個執行緒來執行,所運算的時間應該要只有一般狀況下的1/5,但從執行結果來看,Multi-Threading在速度上並沒有比一般快,或只有快一點點
5. Lock 鎖住
- 使用時機在使用多執行緒(Multi-Threading)時,每個執行緒都會同時執行,但有時候我們不能讓多個執行緒同時執行,像是我們不希望同時寫入檔案,這樣可能會造成錯亂,簡單來說就是我們希望執行緒等待上一個執行緒完成工作後,才能進行工作,一次只允許一個執行緒執行工作,這時我們就會使用lock
- 兩個重要的lock使用方法
- i. Lock.acquire: 當執行緒使用acquire時,就會獲得執行的權利,這時候只有它能夠執行,其他執行緒要等它執行完畢,才能執行
- ii. Lock.release: 當執行緒使用release,就會釋放執行的權利,讓下一個呼叫acquire的執行緒獲取執行權利
- 程式碼範例1: 未使用lock的狀況
## 導入套件 import threading ## 定義兩個函數(工作) 分別由兩個執行緒來執行 def thread_job1(): ## 將result 定義為全域變數 global result ## 執行20次,每次將result值加1 for i in range(20): result += 1 print("thread job1: ", result) def thread_job2(): ## 將result 定義為全域變數 global result ## 執行20次,每次將result值加1 for i in range(20): result += 2 print("thread job2: ", result) if __name__ == '__main__': result = 0 ## 定義兩條執行緒,分別執行兩種函數(工作) thread1 = threading.Thread(target = thread_job1) thread2 = threading.Thread(target = thread_job2) ## 執行 thread1.start() thread2.start() ## 執行完畢 thread1.join() thread2.join()
執行結果
thread job1: 1 thread job1: 2 thread job1: 3 thread job1: 4 thread job1: 5 thread job1: 6 thread job1: 7 thread job1: thread job2: 10 8 thread job1: 11 thread job1: 12 thread job1: 13thread job2: 15 thread job2: 17 thread job2: 19 thread job2: thread job1: 22 thread job1: 23 thread job1: 24 thread job1: 25 thread job1: 26 thread job1: 27 thread job1: 28 thread job1: 29 thread job1: 30 21 thread job2: 32 thread job2: 34 thread job2: 36 thread job2: 38 thread job2: 40 thread job2: 42 thread job2: 44 thread job2: 46 thread job2: 48 thread job2: 50 thread job2: 52 thread job2: 54 thread job2: 56 thread job2: 58 thread job2: 60
重點: 大家會發現每次執行的結果都不同,執行結果也顯示的非常亂,所以接下來我們要使用Lock來解決這個問題
- 程式碼範例2: 使用threading.Lock()來實現lock的方法
## 導入套件 import threading ## 定義兩個函數(工作) 分別由兩個執行緒來執行 def thread_job1(): ## 將lock、result定義為全域變數 global lock, result ## 獲取執行權 lock.acquire() ## 執行20次,每次將result值加1 for i in range(20): result += 1 print("thread job1: ", result) ## 釋放執行權 lock.release() def thread_job2(): ## 將locl、result定義為全域函數 global lock, result ## 獲取執行權 lock.acquire() ## 執行20次,每次將result值加2 for i in range(20): result +=2 print("thread job2: ", result) ## 釋放執行權 lock.release() if __name__ == '__main__': ## 定義lock的方法 lock = threading.Lock() result = 0 ## 定義兩條執行緒,分別執行兩種函數(工作) thread1 = threading.Thread(target = thread_job1) thread2 = threading.Thread(target = thread_job2) ## 執行 thread1.start() thread2.start() ## 執行完畢 thread1.join() thread2.join()
執行結果
thread job1: 1 thread job1: 2 thread job1: 3 thread job1: 4 thread job1: 5 thread job1: 6 thread job1: 7 thread job1: 8 thread job1: 9 thread job1: 10 thread job1: 11 thread job1: 12 thread job1: 13 thread job1: 14 thread job1: 15 thread job1: 16 thread job1: 17 thread job1: 18 thread job1: 19 thread job1: 20 thread job2: 22 thread job2: 24 thread job2: 26 thread job2: 28 thread job2: 30 thread job2: 32 thread job2: 34 thread job2: 36 thread job2: 38 thread job2: 40 thread job2: 42 thread job2: 44 thread job2: 46 thread job2: 48 thread job2: 50 thread job2: 52 thread job2: 54 thread job2: 56 thread job2: 58 thread job2: 60
6. Semaphore旗標
- 說明: 功能類似於Lock,但Lock一次只允許一個執行緒進行工作,而Semaphore允許多個執行緒同時工作,但要限制數量,它也是用acquire來獲取執行權,release來釋放執行權,但不同的是Semaphore在執行這兩個函數時,多了一個計數器的概念,當acquire時會-1,release時會+1,減到為0時,下一個執行緒就需要正在執行工作執行緒release釋放權限後,才能執行
- 使用時機: 因為系統的資源有限,像是CPU或記憶體限制,在執行很耗資源的程式時,我們希望限制同時執行工作的執行緒數量,才不會造成系統執行很慢
- 使用方法: threading.Semaphore()函數裡面的參數為限制同時執行的執行緒數量,像是threading.Semaphore(2),代表同時限制兩個執行緒執行
- 程式碼範例:
## 導入套件 import threading import time from queue import Queue ## 定義一個類別 class Worker(threading.Thread): ## 初始化 def __init__(self, queue, name, semaphore): ## 建立執行緒 threading.Thread.__init__(self) self.queue = queue self.name = name self.semaphore = semaphore ## 執行的工作 def run(self): ## 當Queue中還有資料沒有被拿出來時 while self.queue.qsize() > 0: ## 取出Queue中的資料 msg = self.queue.get() ## 取得執行權 semaphore.acquire() print("%s acquire the Semaphore" % self.name) ## 只允許有限的執行緒同時執行 print("%s get %s from my_queue" % (self.name, msg)) time.sleep(1) ## 釋放執行權 print("%s released the Semaphohre" % self.name) self.semaphore.release() if __name__ == '__main__': ## 啟用Queue 佇列 my_queue = Queue() ## 將數據塞進Queue中 for i in range(6): my_queue.put("Data %d" % i) ## 建立旗標,並限制同時只可以有兩個執行緒獲取執行權 semaphore = threading.Semaphore(2) ## 建立類別,同時建立執行緒 person1 = Worker(my_queue, 'Jack', semaphore) person2 = Worker(my_queue, 'Tom', semaphore) person3 = Worker(my_queue, 'Cathy', semaphore) ## 啟動執行緒 person1.start() person2.start() person3.start()
提醒: 這邊執行的函數名稱一定要使用run才會執行
執行結果
Jack acquire the Semaphore Jack get Data 0 from my_queue Tom acquire the Semaphore Tom get Data 1 from my_queue Jack released the Semaphohre Jack acquire the Semaphore Jack get Data 3 from my_queue Tom released the Semaphohre Tom acquire the Semaphore Tom get Data 4 from my_queue Jack released the Semaphohre Jack acquire the Semaphore Jack get Data 5 from my_queue Tom released the Semaphohre Cathy acquire the Semaphore Cathy get Data 2 from my_queue Jack released the Semaphohre Cathy released the Semaphohre
7. RLock
- 說明: 為一個可以重複取得執行權的方法,與Lock功能類似,但是它可以允許同一個執行緒,重複取得執行的權利,RLock也有計數器的概念,但與Semaphore不同的點在於acquire獲取權限的時候會+1,release釋放權限時會-1,剛好與Semaphore相反,當遞減到0時,才會真的釋放執行權,大於0的時候,其它執行緒都不能獲得執行權
- 例子說明:
這邊我使用lock來示範,如下面,當重複acquire一次時,因為執行權已經被鎖定了,所以會被擋住
## 啟動lock lock = threading.Lock() ## 獲取執行權 lock.acquire() ## 再次獲取執行權時,會被擋住 lock.acquire()
使用RLock就不會被擋住囉!!
## 啟動RLock rlock = threading.RLock() ## 取得執行權 rlock.acquire() ## 重複取得執行權,不會被擋住 rlock.acquire() ## 釋放執行權 rlock.release ## 再次釋放執行權 rlock.release()
介紹完了Python的多執行緒(Multi-Threading)-Thread套件的使用方法,大家是否已經躍躍欲試了,但受到GIL(Global Interpreter Lock)的限制,這樣的方式在Python中執行速度不一定會比較快,所以下一次我會帶大家跟我一起學習多進程Multi-Processing的使用方法喔~~
Reference
https://blog.gtwang.org/programming/python-threading-multithreaded-programming-tutorial/
Like my work? Don't forget to support and clap, let me know that you are with me on the road of creation. Keep this enthusiasm together!