1. python多線程幾種方法實現
Python進階(二十六)-多線程實現同步的四種方式
臨界資源即那些一次只能被一個線程訪問的資源,典型例子就是列印機,它一次只能被一個程序用來執行列印功能,因為不能多個線程同時操作,而訪問這部分資源的代碼通常稱之為臨界區。
鎖機制
threading的Lock類,用該類的acquire函數進行加鎖,用realease函數進行解鎖
import threadingimport timeclass Num:
def __init__(self):
self.num = 0
self.lock = threading.Lock() def add(self):
self.lock.acquire()#加鎖,鎖住相應的資源
self.num += 1
num = self.num
self.lock.release()#解鎖,離開該資源
return num
n = Num()class jdThread(threading.Thread):
def __init__(self,item):
threading.Thread.__init__(self)
self.item = item def run(self):
time.sleep(2)
value = n.add()#將num加1,並輸出原來的數據和+1之後的數據
print(self.item,value)for item in range(5):
t = jdThread(item)
t.start()
t.join()#使線程一個一個執行
當一個線程調用鎖的acquire()方法獲得鎖時,鎖就進入「locked」狀態。每次只有一個線程可以獲得鎖。如果此時另一個線程試圖獲得這個鎖,該線程就會變為「blocked」狀態,稱為「同步阻塞」(參見多線程的基本概念)。
直到擁有鎖的線程調用鎖的release()方法釋放鎖之後,鎖進入「unlocked」狀態。線程調度程序從處於同步阻塞狀態的線程中選擇一個來獲得鎖,並使得該線程進入運行(running)狀態。
信號量
信號量也提供acquire方法和release方法,每當調用acquire方法的時候,如果內部計數器大於0,則將其減1,如果內部計數器等於0,則會阻塞該線程,知道有線程調用了release方法將內部計數器更新到大於1位置。
import threadingimport timeclass Num:
def __init__(self):
self.num = 0
self.sem = threading.Semaphore(value = 3) #允許最多三個線程同時訪問資源
def add(self):
self.sem.acquire()#內部計數器減1
self.num += 1
num = self.num
self.sem.release()#內部計數器加1
return num
n = Num()class jdThread(threading.Thread):
def __init__(self,item):
threading.Thread.__init__(self)
self.item = item def run(self):
time.sleep(2)
value = n.add()
print(self.item,value)for item in range(100):
2. python實現多線程並發執行
由於停服維護的需求(服務越來越多的原因),此前編寫的shell腳本執行速度緩慢(for循環,這就會很慢),為提高執行速度,參考很多資料,完成此腳本,實現並發執行機制.(當然這是測試腳本,有需要的同學,拿去改ba改ba,應該就可以用了)
此處腳本參考了 https://www.jb51.net/article/86053.htm
3. python多線程並行計算通過向線程池ThreadPoolExecutor提交任務的實現方法
Python的線程池可以有效地控制系統中並發線程的數量。
當程序中需要創建許多生存期較短的線程執行運算任務時,首先考慮使用線程池。線程池任務啟動時會創建出最大線程數參數 max_workers 指定數量的空閑線程,程序只要將執行函數提交給線程池,線程池就會啟動一個空閑的線程來執行它。當該函數執行結束後,該線程並不會死亡,而是再次返回到線程池中變成空閑狀態,等待執行下一個函數。配合使用 with 關鍵字實現任務隊列完成後自動關閉線程池釋放資源。
4. python多線程並發數量控制
python多線程如果不進行並發數量控制,在啟動線程數量多到一定程度後,會造成線程無法啟動的錯誤。
控制多線程並發數量的方法有好幾鍾,下面介紹用queue控制多線程並發數量的方法。python3
5. Python的多進程模塊multiprocessing
眾所周知,Python中不存在真正的多線程,Python中的多線程是一個並發過程。如果想要並行的執行程序,充分的利用cpu資源(cpu核心),還是需要使用多進程解決的。其中multiprocessing模塊應該是Python中最常用的多進程模塊了。
基本上multiprocessing這個模塊和threading這個模塊用法是相同的,也是可以通過函數和類創建進程。
上述案例基本上就是筆者搬用了上篇文章多線程的案例,可見其使用的相似之處。導入multiprocessing後實例化Process就可以創建一個進程,參數的話也是和多線程一樣,target放置進程執行函數,args存放該函數的參數。
使用類來創建進程也是需要先繼承multiprocessing.Process並且實現其init方法。
Pool可以提供指定數量的進程,供用戶調用,當有新的請求提交到pool中時,如果池還沒有滿,那麼就會創建一個新的進程用來執行該請求。
但如果池中的進程數已經達到規定最大值,那麼該請求就會等待,直到池中有進程結束,才會創建新的進程。
需要注意的是,在調用join方法阻塞進程前,需要先調用close方法,,否則程序會出錯。
在上述案例中,提到了非阻塞,當把創建進程的方法換為pool.apply(func, (msg,))時,就會阻塞進程,出現下面的狀況。
在multiprocessing模塊中還存在Queue對象,這是一個進程的安全隊列,近似queue.Queue。隊列一般也是需要配合多線程或者多進程使用。
下列案例是一個使用進程隊列實現的生產者消費者模式。
multiprocessing支持兩種進程間的通信,其中一種便是上述案例的隊列,另一種則稱作管道。在官方文檔的描述中,multiprocessing中的隊列是基於管道實現的,並且擁有更高的讀寫效率。
管道可以理解為進程間的通道,使用Pipe([plex])創建,並返回一個元組(conn1,conn2)。如果plex被置為True(默認值),那麼該管道是雙向的,如果plex被置為False,那麼該管道是單向的,即conn1隻能用於接收消息,而conn2僅能用於發送消息。
其中conn1、conn2表示管道兩端的連接對象,每個連接對象都有send()和recv()方法。send和recv方法分別是發送和接受消息的方法。例如,可以調用conn1.send發送消息,conn1.recv接收消息。如果沒有消息可接收,recv方法會一直阻塞。如果管道已經被關閉,那麼recv方法會拋出EOFError。
關於multiprocessing模塊其實還有很多實用的類和方法,由於篇幅有限(懶),筆者就先寫到這里。該模塊其實用起來很像threading模塊,像鎖對象和守護線程(進程)等multiprocessing模塊也是有的,使用方法也近乎相同。
如果想要更加詳細的了解multiprocessing模塊,請參考官方文檔。
6. Python中的多進程與多線程/分布式該如何使用
Python提供了非常好用的多進程包multiprocessing,你只需要定義一個函數,Python會替你完成其他所有事情。
藉助這個包,可以輕松完成從單進程到並發執行的轉換。
1、新建單一進程
如果我們新建少量進程,可以如下:
import multiprocessing
import time
def func(msg):
for i in xrange(3):
print msg
time.sleep(1)
if __name__ == "__main__":
p = multiprocessing.Process(target=func, args=("hello", ))
p.start()
p.join()
print "Sub-process done."12345678910111213
2、使用進程池
是的,你沒有看錯,不是線程池。它可以讓你跑滿多核CPU,而且使用方法非常簡單。
注意要用apply_async,如果落下async,就變成阻塞版本了。
processes=4是最多並發進程數量。
import multiprocessing
import time
def func(msg):
for i in xrange(3):
print msg
time.sleep(1)
if __name__ == "__main__":
pool = multiprocessing.Pool(processes=4)
for i in xrange(10):
msg = "hello %d" %(i)
pool.apply_async(func, (msg, ))
pool.close()
pool.join()
print "Sub-process(es) done."12345678910111213141516
3、使用Pool,並需要關注結果
更多的時候,我們不僅需要多進程執行,還需要關注每個進程的執行結果,如下:
import multiprocessing
import time
def func(msg):
for i in xrange(3):
print msg
time.sleep(1)
return "done " + msg
if __name__ == "__main__":
pool = multiprocessing.Pool(processes=4)
result = []
for i in xrange(10):
msg = "hello %d" %(i)
result.append(pool.apply_async(func, (msg, )))
pool.close()
pool.join()
for res in result:
print res.get()
print "Sub-process(es) done."
2014.12.25更新
根據網友評論中的反饋,在Windows下運行有可能崩潰(開啟了一大堆新窗口、進程),可以通過如下調用來解決:
multiprocessing.freeze_support()1
附錄(自己的腳本):
#!/usr/bin/python
import threading
import subprocess
import datetime
import multiprocessing
def dd_test(round, th):
test_file_arg = 'of=/zbkc/test_mds_crash/1m_%s_%s_{}' %(round, th)
command = "seq 100 | xargs -i dd if=/dev/zero %s bs=1M count=1" %test_file_arg
print command
subprocess.call(command,shell=True,stdout=open('/dev/null','w'),stderr=subprocess.STDOUT)
def mds_stat(round):
p = subprocess.Popen("zbkc mds stat", shell = True, stdout = subprocess.PIPE)
out = p.stdout.readlines()
if out[0].find('active') != -1:
command = "echo '0205pm %s round mds status OK, %s' >> /round_record" %(round, datetime.datetime.now())
command_2 = "time (ls /zbkc/test_mds_crash/) 2>>/round_record"
command_3 = "ls /zbkc/test_mds_crash | wc -l >> /round_record"
subprocess.call(command,shell=True)
subprocess.call(command_2,shell=True)
subprocess.call(command_3,shell=True)
return 1
else:
command = "echo '0205 %s round mds status abnormal, %s, %s' >> /round_record" %(round, out[0], datetime.datetime.now())
subprocess.call(command,shell=True)
return 0
#threads = []
for round in range(1, 1600):
pool = multiprocessing.Pool(processes = 10) #使用進程池
for th in range(10):
# th_name = "thread-" + str(th)
# threads.append(th_name) #添加線程到線程列表
# threading.Thread(target = dd_test, args = (round, th), name = th_name).start() #創建多線程任務
pool.apply_async(dd_test, (round, th))
pool.close()
pool.join()
#等待線程完成
# for t in threads:
# t.join()
if mds_stat(round) == 0:
subprocess.call("zbkc -s",shell=True)
break
7. Python中級精華-並發之啟動和停止線程
為了讓代碼能夠並發執行,向創建線程並在核實的時候銷毀它。
由於目的比較單純,只是講解基礎的線程創建方法,所以可以直接使用threading庫中的Thread類來實例化一個線程對象。
例子,用戶輸入兩個數字,並且求其兩個數字的四則運算的結果:
除了以上的一些功能以外,在python線程
中沒有其他的諸如給線程發信號、設置線程調度屬性、執行任何其他高級操作的功能了,如果需要這些功能,就需要手工編寫了。
另外,需要注意的是,由於GIL(全局解釋器鎖)的存在,限制了在python解釋器當中只允許運行一個線程。基於這個原因,不停該使用python線程來處理計算密集型的任務,因為在這種任務重我們希望在多個CPU核心上實現並行處理。Python線程更適合於IO處理以及設計阻塞操作的並發執行任務(即等待IO響應或等待資料庫取出結果等)。
如何判斷線程是否已經啟動?
目的:我們載入了一個線程,但是想要知道這個線程什麼時候才會開始運行?
方法:
線程的核心特徵我認為就是不確定性,因為其什麼時候開始運行,什麼時候被打斷,什麼時候恢復執行,這不是程序員能夠控制的,而是有系統調度
來完成的。如果遇到像某個線程的運行依託於其他某個線程運行到某個狀態時該線程才能開始運行,那麼這就是線程同步
問題,同樣這個問題非常棘手。要解決這類問題我們要藉助threading中的Event對象。
Event其實和條件標記類似,勻速線程
等待某個時間發生。初始狀態時事件被設置成0。如果事件沒有被設置而線程正在等待該事件,那麼線程就會被阻塞,直到事件被設置位置,當有線程設置了這個事件之後,那麼就會喚醒正在等待事件的線程,如果線程等待的事件已經設置了,那麼線程會繼續執行。
一個例子:
如上能夠確定的是,主線程會在線程t運行結束時再運行。
8. Python如何實現並行的多線程
Python中使用線程有兩種方式:函數或者用類來包裝線程對象。函數式:調用thread模塊中的start_new_thread()函數來產生新線程。線程模塊:Python通過兩個標准庫thread和threading提供對線程的支持。
9. python stackless 怎麼多線程並發
1 介紹
1.1 為什麼要使用Stackless
摘自stackless網站。
Note
Stackless Python 是Python編程語言的一個增強版本,它使程序員從基於線程的編程方式中獲得好處,並避免傳統線程所帶來的性能與復雜度問題。Stackless為 Python帶來的微線程擴展,是一種低開銷、輕量級的便利工具,如果使用得當,可以獲益如下:
改進程序結構
增進代碼可讀性
提高編程人員生產力
以上是Stackless Python很簡明的釋義,但其對我們意義何在?——就在於Stackless提供的並發建模工具,比目前其它大多數傳統編程語言所提供的,都更加易用: 不僅是Python自身,也包括Java、C++,以及其它。盡管還有其他一些語言提供並發特性,可它們要麼是主要用於學術研究的(如 Mozart/Oz),要麼是罕為使用、或用於特殊目的的專業語言(如Erlang)。而使用stackless,你將會在Python本身的所有優勢之 上,在一個(但願)你已經很熟悉的環境中,再獲得並發的特性。
這自然引出了個問題:為什麼要並發?
1.1.1 現實世界就是並發的
現實世界就是「並發」的,它是由一群事物(或「演員」)所組成,而這些事物以一種對彼此所知有限的、鬆散耦合的方式相互作用。傳說中面向對象編程有 一個好處,就是對象能夠對現實的世界進行模擬。這在一定程度上是正確的,面向對象編程很好地模擬了對象個體,但對於這些對象個體之間的交互,卻無法以一種 理想的方式來表現。例如,如下代碼實例,有什麼問題?
第一印象,沒問題。但是,上例中存在一個微妙的安排:所有事件是次序發生的,即:直到丈夫吃完飯,妻子才開始吃;兒子則一直等到母親吃完才吃;而女 兒則是最後一個。在現實世界中,哪怕是丈夫還堵車在路上,妻子、兒子和女兒仍然可以該吃就吃,而要在上例中的話,他們只能餓死了——甚至更糟:永遠沒有人 會知道這件事,因為他們永遠不會有機會拋出一個異常來通知這個世界!
1.1.2 並發可能是(僅僅可能是)下一個重要的編程範式
我個人相信,並發將是軟體世界裡的下一個重要範式。隨著程序變得更加復雜和耗費資源,我們已經不能指望摩爾定律來每年給我們提供更快的CPU了,當 前,日常使用的個人計算機的性能提升來自於多核與多CPU機。一旦單個CPU的性能達到極限,軟體開發者們將不得不轉向分布式模型,靠多台計算機的互相協 作來建立強大的應用(想想GooglePlex)。為了取得多核機和分布式編程的優勢,並發將很快成為做事情的方式的事實標准。
1.2 安裝stackless
安裝Stackless的細節可以在其網站上找到。現在Linux用戶可以通過SubVersion取得源代碼並編譯;而對於Windows用戶, 則有一個.zip文件供使用,需要將其解壓到現有的Python安裝目錄中。接下來,本教程假設Stackless Python已經安裝好了,可以工作,並且假設你對Python語言本身有基本的了解。
2 stackless起步
本章簡要介紹了stackless的基本概念,後面章節將基於這些基礎,來展示更加實用的功能。
2.1 微進程(tasklet)
微進程是stackless的基本構成單元,你可以通過提供任一個Python可調用對象(通常為函數或類的方法)來建立它,這將建立一個微進程並將其添加到調度器。這是一個快速演示:
注意,微進程將排起隊來,並不運行,直到調用stackless.run()。
2.2 調度器(scheler)
調度器控制各個微進程運行的順序。如果剛剛建立了一組微進程,它們將按照建立的順序來執行。在現實中,一般會建立一組可以再次被調度的微進程,好讓每個都有輪次機會。一個快速演示:
注意:當調用stackless.schele()的時候,當前活動微進程將暫停執行,並將自身重新插入到調度器隊列的末尾,好讓下一個微進程被執行。一旦在它前面的所有其他微進程都運行過了,它將從上次 停止的地方繼續開始運行。這個過程會持續,直到所有的活動微進程都完成了運行過程。這就是使用stackless達到合作式多任務的方式。
2.3 通道(channel)
通道使得微進程之間的信息傳遞成為可能。它做到了兩件事:
能夠在微進程之間交換信息。
能夠控制運行的流程。
又一個快速演示:
接收的微進程調用channel.receive()的時候,便阻塞住,這意味著該微進程暫停執行,直到有信息從這個通道送過來。除了往這個通道發送信息以外,沒有其他任何方式可以讓這個微進程恢復運行。
若有其他微進程向這個通道發送了信息,則不管當前的調度到了哪裡,這個接收的微進程都立即恢復執行;而發送信息的微進程則被轉移到調度列表的末尾,就像調用了stackless.schele()一樣。
同樣注意,發送信息的時候,若當時沒有微進程正在這個通道上接收,也會使當前微進程阻塞:
發送信息的微進程,只有在成功地將數據發送到了另一個微進程之後,才會重新被插入到調度器中。
2.4 總結
以上涵蓋了stackless的大部分功能。似乎不多是吧?——我們只使用了少許對象,和大約四五個函數調用,來進行操作。但是,使用這種簡單的API作為基本建造單元,我們可以開始做一些真正有趣的事情。
3 協程(coroutine)
3.1 子常式的問題
大多數傳統編程語言具有子常式的概念。一個子常式被另一個常式(可能還是其它某個常式的子常式)所調用,或返回一個結果,或不返回結果。從定義上說,一個子常式是從屬於其調用者的。
見下例:
有經驗的編程者會看到這個程序的問題所在:它導致了堆棧溢出。如果運行這個程序,它將顯示一大堆討厭的跟蹤信息,來指出堆棧空間已經耗盡。
3.1.1 堆棧
我仔細考慮了,自己對C語言堆棧的細節究竟了解多少,最終還是決定完全不去講它。似乎,其他人對其所嘗試的描述,以及圖表,只有本身已經理解了的人才能看得懂。我將試著給出一個最簡單的說明,而對其有更多興趣的讀者可以從網上查找更多信息。
每當一個子常式被調用,都有一個「棧幀」被建立,這是用來保存變數,以及其他子常式局部信息的區域。於是,當你調用 ping() ,則有一個棧幀被建立,來保存這次調用相關的信息。簡言之,這個幀記載著 ping 被調用了。當再調用 pong() ,則又建立了一個棧幀,記載著 pong 也被調用了。這些棧幀是串聯在一起的,每個子常式調用都是其中的一環。就這樣,堆棧中顯示: ping 被調用所以 pong 接下來被調用。顯然,當 pong() 再調用 ping() ,則使堆棧再擴展。下面是個直觀的表示:
幀 堆棧
1 ping 被調用
2 ping 被調用,所以 pong 被調用
3 ping 被調用,所以 pong 被調用,所以 ping 被調用
4 ping 被調用,所以 pong 被調用,所以 ping 被調用,所以 pong 被調用
5 ping 被調用,所以 pong 被調用,所以 ping 被調用,所以 pong 被調用,所以 ping 被調用
6 ping 被調用,所以 pong 被調用,所以 ping 被調用,所以 pong 被調用,所以 ping 被調用……
現在假設,這個頁面的寬度就表示系統為堆棧所分配的全部內存空間,當其頂到頁面的邊緣的時候,將會發生溢出,系統內存耗盡,即術語「堆棧溢出」。
3.1.2 那麼,為什麼要使用堆棧?
上例是有意設計的,用來體現堆棧的問題所在。在大多數情況下,當每個子常式返回的時候,其棧幀將被清除掉,就是說堆棧將會自行實現清理過程。這一般 來說是件好事,在C語言中,堆棧就是一個不需要編程者來手動進行內存管理的區域。很幸運,Python程序員也不需要直接來擔心內存管理與堆棧。但是由於 Python解釋器本身也是用C實現的,那些實現者們可是需要擔心這個的。使用堆棧是會使事情方便,除非我們開始調用那種從不返回的函數,如上例中的,那 時候,堆棧的表現就開始和程序員別扭起來,並耗盡可用的內存。
3.2 走進協程
此時,將堆棧弄溢出是有點愚蠢的。 ping() 和 pong() 本不是真正意義的子常式,因為其中哪個也不從屬於另一個,它們是「協程」,處於同等的地位,並可以彼此間進行無縫通信。
幀 堆棧
1 ping 被調用
2 pong 被調用
3 ping 被調用
4 pong 被調用
5 ping 被調用
6 pong 被調用
在stackless中,我們使用通道來建立協程。還記得嗎,通道所帶來的兩個好處中的一個,就是能夠控制微進程之間運行的流程。使用通道,我們可以在 ping 和 pong 這兩個協程之間自由來回,要多少次就多少次,都不會堆棧溢出:
你可以運行這個程序要多久有多久,它都不會崩潰,且如果你檢查其內存使用量(使用Windows的任務管理器或Linux的top命令),將會發現 使用量是恆定的。這個程序的協程版本,不管運行一分鍾還是一天,使用的內存都是一樣的。而如果你檢查原先那個遞歸版本的內存用量,則會發現其迅速增長,直 到崩潰。
3.3 總結
是否還記得,先前我提到過,那個代碼的遞歸版本,有經驗的程序員會一眼看出毛病。但老實說,這裡面並沒有什麼「計算機科學」方面的原因在阻礙它的正 常工作,有些讓人堅信的東西,其實只是個與實現細節有關的小問題——只因為大多數傳統編程語言都使用堆棧。某種意義上說,有經驗的程序員都是被洗了腦,從 而相信這是個可以接受的問題。而stackless,則真正察覺了這個問題,並除掉了它。
4 輕量級線程
與當今的操作系統中內建的、和標准Python代碼中所支持的普通線程相比,「微線程」要更為輕量級,正如其名稱所暗示。它比傳統線程佔用更少的內存,並且微線程之間的切換,要比傳統線程之間的切換更加節省資源。
為了准確說明微線程的效率究竟比傳統線程高多少,我們用兩者來寫同一個程序。
4.1 hackysack模擬
Hackysack是一種游戲,就是一夥臟乎乎的小子圍成一個圈,來回踢一個裝滿了豆粒的沙包,目標是不讓這個沙包落地,當傳球給別人的時候,可以耍各種把戲。踢沙包只可以用腳。
在我們的簡易模擬中,我們假設一旦游戲開始,圈裡人數就是恆定的,並且每個人都是如此厲害,以至於如果允許的話,這個游戲可以永遠停不下來。
4.2 游戲的傳統線程版本