1. python多進程中隊列不空時阻塞,求解為什麼
最近接觸一個項目,要在多個虛擬機中運行任務,參考別人之前項目的代碼,採用了多進程來處理,於是上網查了查python中的多進程
一、先說說Queue(隊列對象)
Queue是python中的標准庫,可以直接import 引用,之前學習的時候有聽過著名的「先吃先拉」與「後吃先吐」,其實就是這里說的隊列,隊列的構造的時候可以定義它的容量,別吃撐了,吃多了,就會報錯,構造的時候不寫或者寫個小於1的數則表示無限多
import Queue
q = Queue.Queue(10)
向隊列中放值(put)
q.put(『yang')
q.put(4)
q.put([『yan','xing'])
在隊列中取值get()
默認的隊列是先進先出的
>>> q.get()
『yang'
>>> q.get()
4
>>> q.get()
[『yan', 『xing']
當一個隊列為空的時候如果再用get取則會堵塞,所以取隊列的時候一般是用到
get_nowait()方法,這種方法在向一個空隊列取值的時候會拋一個Empty異常
所以更常用的方法是先判斷一個隊列是否為空,如果不為空則取值
隊列中常用的方法
Queue.qsize() 返回隊列的大小
Queue.empty() 如果隊列為空,返回True,反之False
Queue.full() 如果隊列滿了,返回True,反之False
Queue.get([block[, timeout]]) 獲取隊列,timeout等待時間
Queue.get_nowait() 相當Queue.get(False)
非阻塞 Queue.put(item) 寫入隊列,timeout等待時間
Queue.put_nowait(item) 相當Queue.put(item, False)
二、multiprocessing中使用子進程概念
from multiprocessing import Process
可以通過Process來構造一個子進程
p = Process(target=fun,args=(args))
再通過p.start()來啟動子進程
再通過p.join()方法來使得子進程運行結束後再執行父進程
from multiprocessing import Process
import os
# 子進程要執行的代碼
def run_proc(name):
print 'Run child process %s (%s)...' % (name, os.getpid())
if __name__=='__main__':
print 'Parent process %s.' % os.getpid()
p = Process(target=run_proc, args=('test',))
print 'Process will start.'
p.start()
p.join()
print 'Process end.'
上面的程序運行後的結果其實是按照上圖中1,2,3分開進行的,先列印1,3秒後列印2,再3秒後列印3
代碼中的p.close()是關掉進程池子,是不再向裡面添加進程了,對Pool對象調用join()方法會等待所有子進程執行完畢,調用join()之前必須先調用close(),調用close()之後就不能繼續添加新的Process了。
當時也可以是實例pool的時候給它定義一個進程的多少
如果上面的代碼中p=Pool(5)那麼所有的子進程就可以同時進行
三、多個子進程間的通信
多個子進程間的通信就要採用第一步中說到的Queue,比如有以下的需求,一個子進程向隊列中寫數據,另外一個進程從隊列中取數據,
#coding:gbk
from multiprocessing import Process, Queue
import os, time, random
# 寫數據進程執行的代碼:
def write(q):
for value in ['A', 'B', 'C']:
print 'Put %s to queue...' % value
q.put(value)
time.sleep(random.random())
# 讀數據進程執行的代碼:
def read(q):
while True:
if not q.empty():
value = q.get(True)
print 'Get %s from queue.' % value
time.sleep(random.random())
else:
break
if __name__=='__main__':
# 父進程創建Queue,並傳給各個子進程:
q = Queue()
pw = Process(target=write, args=(q,))
pr = Process(target=read, args=(q,))
# 啟動子進程pw,寫入:
pw.start()
# 等待pw結束:
pw.join()
# 啟動子進程pr,讀取:
pr.start()
pr.join()
# pr進程里是死循環,無法等待其結束,只能強行終止:
print
print '所有數據都寫入並且讀完'
四、關於上面代碼的幾個有趣的問題
if __name__=='__main__':
# 父進程創建Queue,並傳給各個子進程:
q = Queue()
p = Pool()
pw = p.apply_async(write,args=(q,))
pr = p.apply_async(read,args=(q,))
p.close()
p.join()
print
print '所有數據都寫入並且讀完'
如果main函數寫成上面的樣本,本來我想要的是將會得到一個隊列,將其作為參數傳入進程池子里的每個子進程,但是卻得到
RuntimeError: Queue objects should only be shared between processes through inheritance
的錯誤,查了下,大意是隊列對象不能在父進程與子進程間通信,這個如果想要使用進程池中使用隊列則要使用multiprocess的Manager類
if __name__=='__main__':
manager = multiprocessing.Manager()
# 父進程創建Queue,並傳給各個子進程:
q = manager.Queue()
p = Pool()
pw = p.apply_async(write,args=(q,))
time.sleep(0.5)
pr = p.apply_async(read,args=(q,))
p.close()
p.join()
print
print '所有數據都寫入並且讀完'
這樣這個隊列對象就可以在父進程與子進程間通信,不用池則不需要Manager,以後再擴展multiprocess中的Manager類吧
關於鎖的應用,在不同程序間如果有同時對同一個隊列操作的時候,為了避免錯誤,可以在某個函數操作隊列的時候給它加把鎖,這樣在同一個時間內則只能有一個子進程對隊列進行操作,鎖也要在manager對象中的鎖
#coding:gbk
from multiprocessing import Process,Queue,Pool
import multiprocessing
import os, time, random
# 寫數據進程執行的代碼:
def write(q,lock):
lock.acquire() #加上鎖
for value in ['A', 'B', 'C']:
print 'Put %s to queue...' % value
q.put(value)
lock.release() #釋放鎖
# 讀數據進程執行的代碼:
def read(q):
while True:
if not q.empty():
value = q.get(False)
print 'Get %s from queue.' % value
time.sleep(random.random())
else:
break
if __name__=='__main__':
manager = multiprocessing.Manager()
# 父進程創建Queue,並傳給各個子進程:
q = manager.Queue()
lock = manager.Lock() #初始化一把鎖
p = Pool()
pw = p.apply_async(write,args=(q,lock))
pr = p.apply_async(read,args=(q,))
p.close()
p.join()
print
print '所有數據都寫入並且讀完'
2. Python中的多進程與多線程/分布式該如何使用
Python提供了非常好用的多進程包multiprocessing,你只需要定義一個函數,Python會替你完成其他所有事情。
藉助這個包,可以輕松完成從單進程到並發執行的轉換。
1、新建單一進程
如果我們新建少量進程,可以如下:
import multiprocessing
import time
def func(msg):
for i in xrange(3):
print msg
time.sleep(1)
if __name__ == "__main__":
p = multiprocessing.Process(target=func, args=("hello", ))
p.start()
p.join()
print "Sub-process done."12345678910111213
2、使用進程池
是的,你沒有看錯,不是線程池。它可以讓你跑滿多核CPU,而且使用方法非常簡單。
注意要用apply_async,如果落下async,就變成阻塞版本了。
processes=4是最多並發進程數量。
import multiprocessing
import time
def func(msg):
for i in xrange(3):
print msg
time.sleep(1)
if __name__ == "__main__":
pool = multiprocessing.Pool(processes=4)
for i in xrange(10):
msg = "hello %d" %(i)
pool.apply_async(func, (msg, ))
pool.close()
pool.join()
print "Sub-process(es) done."12345678910111213141516
3、使用Pool,並需要關注結果
更多的時候,我們不僅需要多進程執行,還需要關注每個進程的執行結果,如下:
import multiprocessing
import time
def func(msg):
for i in xrange(3):
print msg
time.sleep(1)
return "done " + msg
if __name__ == "__main__":
pool = multiprocessing.Pool(processes=4)
result = []
for i in xrange(10):
msg = "hello %d" %(i)
result.append(pool.apply_async(func, (msg, )))
pool.close()
pool.join()
for res in result:
print res.get()
print "Sub-process(es) done."
2014.12.25更新
根據網友評論中的反饋,在Windows下運行有可能崩潰(開啟了一大堆新窗口、進程),可以通過如下調用來解決:
multiprocessing.freeze_support()1
附錄(自己的腳本):
#!/usr/bin/python
import threading
import subprocess
import datetime
import multiprocessing
def dd_test(round, th):
test_file_arg = 'of=/zbkc/test_mds_crash/1m_%s_%s_{}' %(round, th)
command = "seq 100 | xargs -i dd if=/dev/zero %s bs=1M count=1" %test_file_arg
print command
subprocess.call(command,shell=True,stdout=open('/dev/null','w'),stderr=subprocess.STDOUT)
def mds_stat(round):
p = subprocess.Popen("zbkc mds stat", shell = True, stdout = subprocess.PIPE)
out = p.stdout.readlines()
if out[0].find('active') != -1:
command = "echo '0205pm %s round mds status OK, %s' >> /round_record" %(round, datetime.datetime.now())
command_2 = "time (ls /zbkc/test_mds_crash/) 2>>/round_record"
command_3 = "ls /zbkc/test_mds_crash | wc -l >> /round_record"
subprocess.call(command,shell=True)
subprocess.call(command_2,shell=True)
subprocess.call(command_3,shell=True)
return 1
else:
command = "echo '0205 %s round mds status abnormal, %s, %s' >> /round_record" %(round, out[0], datetime.datetime.now())
subprocess.call(command,shell=True)
return 0
#threads = []
for round in range(1, 1600):
pool = multiprocessing.Pool(processes = 10) #使用進程池
for th in range(10):
# th_name = "thread-" + str(th)
# threads.append(th_name) #添加線程到線程列表
# threading.Thread(target = dd_test, args = (round, th), name = th_name).start() #創建多線程任務
pool.apply_async(dd_test, (round, th))
pool.close()
pool.join()
#等待線程完成
# for t in threads:
# t.join()
if mds_stat(round) == 0:
subprocess.call("zbkc -s",shell=True)
break
3. python怎麼實現一個進程
想要充分利用多核CPU資源,Python中大部分情況下都需要使用多進程,Python中提供了multiprocessing這個包實現多進程。multiprocessing支持子進程、進程間的同步與通信,提供了Process、Queue、Pipe、Lock等組件。
開辟子進程
multiprocessing中提供了Process類來生成進程實例
Process([group [, target [, name [, args [, kwargs]]]]])1
group分組,實際上不使用
target表示調用對象,你可以傳入方法的名字
args表示給調用對象以元組的形式提供參數,比如target是函數a,他有兩個參數m,n,那麼該參數為args=(m, n)即可
kwargs表示調用對象的字典
name是別名,相當於給這個進程取一個名字
先來個小例子:
運行結果:
Parent process run. subProcess is 30196
Parent process end,Mon Mar 27 11:20:21 2017
subProcess 30196 run, Mon Mar 27 11:20:21 2017
subProcess 30196 run, Mon Mar 27 11:20:23 2017
subProcess 30196 run, Mon Mar 27 11:20:25 2017
根據運行結果可知,父進程運行結束後子進程仍然還在運行,這可能造成僵屍( zombie)進程。
通常情況下,當子進程終結時,它會通知父進程,清空自己所佔據的內存,並在內核里留下自己的退出信息。父進程在得知子進程終結時,會從內核中取出子進程的退出信息。但是,如果父進程早於子進程終結,這可能造成子進程的退出信息滯留在內核中,子進程成為僵屍(zombie)進程。當大量僵屍進程積累時,內存空間會被擠占。
有什麼辦法可以避免僵屍進程呢?
這里介紹進程的一個屬性 deamon,當其值為TRUE時,其父進程結束,該進程也直接終止運行(即使還沒運行完)。
所以給上面的程序加上p.deamon = true,看看效果。
執行結果:
Parent process run. subProcess is 31856
Parent process end,Mon Mar 27 11:40:10 2017
這是問題又來了,子進程並沒有執行完,這不是所期望的結果。有沒辦法將子進程執行完後才讓父進程結束呢?
這里引入p.join()方法,它使子進程執行結束後,父進程才執行之後的代碼
執行結果:
subProcess 32076 run, Mon Mar 27 11:46:07 2017
subProcess 32076 run, Mon Mar 27 11:46:09 2017
subProcess 32076 run, Mon Mar 27 11:46:11 2017
Parent process run. subProcess is 32076
Parent process end,Mon Mar 27 11:46:13 2017
這樣所有的進程就能順利的執行了。
將進程定義成類
通過繼承Process類,來自定義進程類,實現run方法。實例p通過調用p.start()時自動調用run方法。
如下:
執行結果和上一個例子相同。
創建多個進程
很多時候系統都需要創建多個進程以提高CPU的利用率,當數量較少時,可以手動生成一個個Process實例。當進程數量很多時,或許可以利用循環,但是這需要程序員手動管理系統中並發進程的數量,有時會很麻煩。這時進程池Pool就可以發揮其功效了。可以通過傳遞參數限制並發進程的數量,默認值為CPU的核數。
直接上例子:
執行結果:
開頭部分
Run the main process (30920).
Waiting for all subprocesses done …
Run child process Process0 (32396)
Run child process Process3 (25392)
Run child process Process1 (28732)
Run child process Process2 (32436)
末尾部分:
Run child process Process15 (25880)
All subprocesses done
All process last 2.49 seconds.
相關說明:
這里進程池對並發進程的限制數量為8個,而程序運行時會產生16個進程,進程池將自動管理系統內進程的並發數量,其餘進程將會在隊列中等待。限制並發數量是因為,系統中並發的進程不是越多越好,並發進程太多,可能使CPU大部分的時間用於進程調度,而不是執行有效的計算。
採用多進程並發技術時,就單個處理機而言,其對進程的執行是串列的。但具體某個時刻哪個進程獲得CPU資源而執行是不可預知的(如執行結果的開頭部分,各進程的執行順序不定),這就體現了進程的非同步性。
如果單個程序執行14次run_proc函數,那麼它會需要至少16秒,通過進程的並發,這里只需要2.49秒,可見並發的優勢。
4. python多進程為什麼一定要
前面講了為什麼Python里推薦用多進程而不是多線程,但是多進程也有其自己的限制:相比線程更加笨重、切換耗時更長,並且在python的多進程下,進程數量不推薦超過CPU核心數(一個進程只有一個GIL,所以一個進程只能跑滿一個CPU),因為一個進程佔用一個CPU時能充分利用機器的性能,但是進程多了就會出現頻繁的進程切換,反而得不償失。
不過特殊情況(特指IO密集型任務)下,多線程是比多進程好用的。
舉個例子:給你200W條url,需要你把每個url對應的頁面抓取保存起來,這種時候,單單使用多進程,效果肯定是很差的。為什麼呢?
例如每次請求的等待時間是2秒,那麼如下(忽略cpu計算時間):
1、單進程+單線程:需要2秒*200W=400W秒==1111.11個小時==46.3天,這個速度明顯是不能接受的2、單進程+多線程:例如我們在這個進程中開了10個多線程,比1中能夠提升10倍速度,也就是大約4.63天能夠完成200W條抓取,請注意,這里的實際執行是:線程1遇見了阻塞,CPU切換到線程2去執行,遇見阻塞又切換到線程3等等,10個線程都阻塞後,這個進程就阻塞了,而直到某個線程阻塞完成後,這個進程才能繼續執行,所以速度上提升大約能到10倍(這里忽略了線程切換帶來的開銷,實際上的提升應該是不能達到10倍的),但是需要考慮的是線程的切換也是有開銷的,所以不能無限的啟動多線程(開200W個線程肯定是不靠譜的)3、多進程+多線程:這里就厲害了,一般來說也有很多人用這個方法,多進程下,每個進程都能佔一個cpu,而多線程從一定程度上繞過了阻塞的等待,所以比單進程下的多線程又更好使了,例如我們開10個進程,每個進程里開20W個線程,執行的速度理論上是比單進程開200W個線程快10倍以上的(為什麼是10倍以上而不是10倍,主要是cpu切換200W個線程的消耗肯定比切換20W個進程大得多,考慮到這部分開銷,所以是10倍以上)。
還有更好的方法嗎?答案是肯定的,它就是:
4、協程,使用它之前我們先講講what/why/how(它是什麼/為什麼用它/怎麼使用它)what:
協程是一種用戶級的輕量級線程。協程擁有自己的寄存器上下文和棧。協程調度切換時,將寄存器上下文和棧保存到其他地方,在切回來的時候,恢復先前保存的寄存器上下文和棧。因此:
協程能保留上一次調用時的狀態(即所有局部狀態的一個特定組合),每次過程重入時,就相當於進入上一次調用的狀態,換種說法:進入上一次離開時所處邏輯流的位置。
在並發編程中,協程與線程類似,每個協程表示一個執行單元,有自己的本地數據,與其它協程共享全局數據和其它資源。
why:
目前主流語言基本上都選擇了多線程作為並發設施,與線程相關的概念是搶占式多任務(Preemptive multitasking),而與協程相關的是協作式多任務。
不管是進程還是線程,每次阻塞、切換都需要陷入系統調用(system call),先讓CPU跑操作系統的調度程序,然後再由調度程序決定該跑哪一個進程(線程)。
而且由於搶占式調度執行順序無法確定的特點,使用線程時需要非常小心地處理同步問題,而協程完全不存在這個問題(事件驅動和非同步程序也有同樣的優點)。
因為協程是用戶自己來編寫調度邏輯的,對CPU來說,協程其實是單線程,所以CPU不用去考慮怎麼調度、切換上下文,這就省去了CPU的切換開銷,所以協程在一定程度上又好於多線程。
how:
python裡面怎麼使用協程?答案是使用gevent,使用方法:看這里使用協程,可以不受線程開銷的限制,我嘗試過一次把20W條url放在單進程的協程里執行,完全沒問題。
所以最推薦的方法,是多進程+協程(可以看作是每個進程里都是單線程,而這個單線程是協程化的)多進程+協程下,避開了CPU切換的開銷,又能把多個CPU充分利用起來,這種方式對於數據量較大的爬蟲還有文件讀寫之類的效率提升是巨大的。
小例子:
#-*- coding=utf-8 -*-
import requests
from multiprocessing import Process
import gevent
from gevent import monkey; monkey.patch_all()import sys
reload(sys)
sys.setdefaultencoding('utf8')
def fetch(url):
try:
s = requests.Session()
r = s.get(url,timeout=1)#在這里抓取頁面
except Exception,e:
print e
return ''
def process_start(tasks):
gevent.joinall(tasks)#使用協程來執行
def task_start(filepath,flag = 100000):#每10W條url啟動一個進程with open(filepath,'r') as reader:#從給定的文件中讀取urlurl = reader.readline().strip()
task_list = []#這個list用於存放協程任務
i = 0 #計數器,記錄添加了多少個url到協程隊列while url!='':
i += 1
task_list.append(gevent.spawn(fetch,url,queue))#每次讀取出url,將任務添加到協程隊列if i == flag:#一定數量的url就啟動一個進程並執行p = Process(target=process_start,args=(task_list,))p.start()
task_list = [] #重置協程隊列
i = 0 #重置計數器
url = reader.readline().strip()
if task_list not []:#若退出循環後任務隊列里還有url剩餘p = Process(target=process_start,args=(task_list,))#把剩餘的url全都放到最後這個進程來執行p.start()
if __name__ == '__main__':
task_start('./testData.txt')#讀取指定文件細心的同學會發現:上面的例子中隱藏了一個問題:進程的數量會隨著url數量的增加而不斷增加,我們在這里不使用進程池multiprocessing.Pool來控制進程數量的原因是multiprocessing.Pool和gevent有沖突不能同時使用,但是有興趣的同學可以研究一下gevent.pool這個協程池。
另外還有一個問題:每個進程處理的url是累積的而不是獨立的,例如第一個進程會處理10W個,第二個進程會變成20W個,以此類推。最後定位到問題是gevent.joinall()導致的問題,有興趣的同學可以研究一下為什麼會這樣。不過這個問題的處理方案是:主進程只負責讀取url然後寫入到list中,在創建子進程的時候直接把list傳給子進程,由子進程自己去構建協程。這樣就不會出現累加的問題
5. python 多進程的順序問題
因為進程池一次只能運行4個進程,0,1,2,3是四個進程同時執行,那麼4隻能等待。當進程池中任意一個進程結束後,4立即執行,所以在0結束後4開始執行,接著1,2,3陸續結束,4最後結束。
6. 為什麼在Python里推薦使用多進程而不是多線程
監控一個信號就起一個線程與進程處理。這樣的邏輯是不太合適的。所有的資源都是有限的,如果這樣浪費很快會資源管理失控。
常規的做法是起一個線程池,或者是進程池。 使用線程還是進程取決於你處理的信號的類型。如果計算量大,則需要進程池,如果只是設備等待,比如網路數據收發,則線程也勉強夠用。
信號過來後處理方法有兩種,一種是實時處理,這個沒有好辦法,可以用「微線程」的辦法做,盡量減少處理周期。另外一種是允許少量的延遲。那麼通常的做法是用隊列。將信號放到線程或者是進程池的消息隊列里。然後再由後者分配。
還有一種高效的處理方法,根據信號的值做hash,然後自動分發到不同的CPU或者是伺服器。這個就算是大規模並發處理機制。
通常情況下,比如一個WEB伺服器,它需要獲取一個請求,然後處理響應,可以使用線程模型,或者是進程模型。也是使用典型的池的方法。一個Pool的大於,取決於你的計算 機的計算 能力,內存大小,以及你的並發訪問數量。
所要要啟用多少個呢?假設你的一個信號的處理周期是1秒,你同時有100個信號進來,那麼就需要100個線程或者是進程。
7. python中多進程+協程的使用以及為什麼要用它
前面講了為什麼python里推薦用多進程而不是多線程,但是多進程也有其自己的限制:相比線程更加笨重、切換耗時更長,並且在python的多進程下,進程數量不推薦超過CPU核心數(一個進程只有一個GIL,所以一個進程只能跑滿一個CPU),因為一個進程佔用一個CPU時能充分利用機器的性能,但是進程多了就會出現頻繁的進程切換,反而得不償失。
不過特殊情況(特指IO密集型任務)下,多線程是比多進程好用的。
舉個例子:給你200W條url,需要你把每個url對應的頁面抓取保存起來,這種時候,單單使用多進程,效果肯定是很差的。為什麼呢?
例如每次請求的等待時間是2秒,那麼如下(忽略cpu計算時間):
1、單進程+單線程:需要2秒*200W=400W秒==1111.11個小時==46.3天,這個速度明顯是不能接受的
2、單進程+多線程:例如我們在這個進程中開了10個多線程,比1中能夠提升10倍速度,也就是大約4.63天能夠完成200W條抓取,請注意,這里的實際執行是:線程1遇見了阻塞,CPU切換到線程2去執行,遇見阻塞又切換到線程3等等,10個線程都阻塞後,這個進程就阻塞了,而直到某個線程阻塞完成後,這個進程才能繼續執行,所以速度上提升大約能到10倍(這里忽略了線程切換帶來的開銷,實際上的提升應該是不能達到10倍的),但是需要考慮的是線程的切換也是有開銷的,所以不能無限的啟動多線程(開200W個線程肯定是不靠譜的)
3、多進程+多線程:這里就厲害了,一般來說也有很多人用這個方法,多進程下,每個進程都能佔一個cpu,而多線程從一定程度上繞過了阻塞的等待,所以比單進程下的多線程又更好使了,例如我們開10個進程,每個進程里開20W個線程,執行的速度理論上是比單進程開200W個線程快10倍以上的(為什麼是10倍以上而不是10倍,主要是cpu切換200W個線程的消耗肯定比切換20W個進程大得多,考慮到這部分開銷,所以是10倍以上)。
還有更好的方法嗎?答案是肯定的,它就是:
4、協程,使用它之前我們先講講what/why/how(它是什麼/為什麼用它/怎麼使用它)
what:
協程是一種用戶級的輕量級線程。協程擁有自己的寄存器上下文和棧。協程調度切換時,將寄存器上下文和棧保存到其他地方,在切回來的時候,恢復先前保存的寄存器上下文和棧。因此:
協程能保留上一次調用時的狀態(即所有局部狀態的一個特定組合),每次過程重入時,就相當於進入上一次調用的狀態,換種說法:進入上一次離開時所處邏輯流的位置。
在並發編程中,協程與線程類似,每個協程表示一個執行單元,有自己的本地數據,與其它協程共享全局數據和其它資源。
why:
目前主流語言基本上都選擇了多線程作為並發設施,與線程相關的概念是搶占式多任務(Preemptive multitasking),而與協程相關的是協作式多任務。
不管是進程還是線程,每次阻塞、切換都需要陷入系統調用(system call),先讓CPU跑操作系統的調度程序,然後再由調度程序決定該跑哪一個進程(線程)。
而且由於搶占式調度執行順序無法確定的特點,使用線程時需要非常小心地處理同步問題,而協程完全不存在這個問題(事件驅動和非同步程序也有同樣的優點)。
因為協程是用戶自己來編寫調度邏輯的,對CPU來說,協程其實是單線程,所以CPU不用去考慮怎麼調度、切換上下文,這就省去了CPU的切換開銷,所以協程在一定程度上又好於多線程。
how:
python裡面怎麼使用協程?答案是使用gevent,使用方法:看這里
使用協程,可以不受線程開銷的限制,我嘗試過一次把20W條url放在單進程的協程里執行,完全沒問題。
所以最推薦的方法,是多進程+協程(可以看作是每個進程里都是單線程,而這個單線程是協程化的)
多進程+協程下,避開了CPU切換的開銷,又能把多個CPU充分利用起來,這種方式對於數據量較大的爬蟲還有文件讀寫之類的效率提升是巨大的。
小例子:
[python]view plain
#-*-coding=utf-8-*-
importrequests
importgevent
fromgeventimportmonkey;monkey.patch_all()
importsys
reload(sys)
sys.setdefaultencoding('utf8')
deffetch(url):
try:
s=requests.Session()
r=s.get(url,timeout=1)#在這里抓取頁面
exceptException,e:
printe
return''
defprocess_start(url_list):
tasks=[]
forurlinurl_list:
tasks.append(gevent.spawn(fetch,url))
gevent.joinall(tasks)#使用協程來執行
deftask_start(filepath,flag=100000):#每10W條url啟動一個進程
withopen(filepath,'r')asreader:#從給定的文件中讀取url
url=reader.readline().strip()
url_list=[]#這個list用於存放協程任務
i=0#計數器,記錄添加了多少個url到協程隊列
whileurl!='':
i+=1
url_list.append(url)#每次讀取出url,將url添加到隊列
ifi==flag:#一定數量的url就啟動一個進程並執行
p=Process(target=process_start,args=(url_list,))
p.start()
url_list=[]#重置url隊列
i=0#重置計數器
url=reader.readline().strip()
ifurl_listnot[]:#若退出循環後任務隊列里還有url剩餘
p=Process(target=process_start,args=(url_list,))#把剩餘的url全都放到最後這個進程來執行
p.start()
if__name__=='__main__':
task_start('./testData.txt')#讀取指定文件
細心的同學會發現:上面的例子中隱藏了一個問題:進程的數量會隨著url數量的增加而不斷增加,我們在這里不使用進程池multiprocessing.Pool來控制進程數量的原因是multiprocessing.Pool和gevent有沖突不能同時使用,但是有興趣的同學可以研究一下gevent.pool這個協程池。
8. 請教python高手有關多進程池的問題
和那個沒關系,是因為進程執行的是文件級別的,你把print放到if __name__==main下面就解決了,別停他們瞎說
9. python 多線程與多進程問題
監控一個信號就起一個線程與進程處理。這樣的邏輯是不太合適的。所有的資源都是有限的,如果這樣浪費很快會資源管理失控。
常規的做法是起一個線程池,或者是進程池。 使用線程還是進程取決於你處理的信號的類型。如果計算量大,則需要進程池,如果只是設備等待,比如網路數據收發,則線程也勉強夠用。
信號過來後處理方法有兩種,一種是實時處理,這個沒有好辦法,可以用「微線程」的辦法做,盡量減少處理周期。另外一種是允許少量的延遲。那麼通常的做法是用隊列。將信號放到線程或者是進程池的消息隊列里。然後再由後者分配。
還有一種高效的處理方法,根據信號的值做hash,然後自動分發到不同的CPU或者是伺服器。這個就算是大規模並發處理機制。
通常情況下,比如一個WEB伺服器,它需要獲取一個請求,然後處理響應,可以使用線程模型,或者是進程模型。也是使用典型的池的方法。一個Pool的大於,取決於你的計算 機的計算 能力,內存大小,以及你的並發訪問數量。
所要要啟用多少個呢?假設你的一個信號的處理周期是1秒,你同時有100個信號進來,那麼就需要100個線程或者是進程。
線程數過多,表面上處理能力在增加,不過延遲也在增加,失敗率也會增加。