Ⅰ 如何在python中編寫並發程序
多進程/多線程+Queue
一般來說,在Python中編寫並發程序的經驗是:計算密集型任務使用多進程,IO密集型任務使用多進程或者多線程.另外,因為涉及到資源共享,所以需要同步鎖等一系列麻煩的步驟,代碼編寫不直觀.另外一種好的思路是利用多進程/多線程+Queue的方法,可以避免加鎖這樣麻煩低效的方式.
現在在Python2中利用Queue+多進程的方法來處理一個IO密集型任務.
假設現在需要下載多個網頁內容並進行解析,單進程的方式效率很低,所以使用多進程/多線程勢在必行.
我們可以先初始化一個tasks隊列,裡面將要存儲的是一系列dest_url,同時開啟4個進程向tasks中取任務然後執行,處理結果存儲在一個results隊列中,最後對results中的結果進行解析.最後關閉兩個隊列.
下面是一些主要的邏輯代碼.
# -*- coding:utf-8 -*-
#IO密集型任務
#多個進程同時下載多個網頁
#利用Queue+多進程
#由於是IO密集型,所以同樣可以利用threading模塊
import multiprocessing
def main():
tasks = multiprocessing.JoinableQueue()
results = multiprocessing.Queue()
cpu_count = multiprocessing.cpu_count() #進程數目==CPU核數目
create_process(tasks, results, cpu_count) #主進程馬上創建一系列進程,但是由於阻塞隊列tasks開始為空,副進程全部被阻塞
add_tasks(tasks) #開始往tasks中添加任務
parse(tasks, results) #最後主進程等待其他線程處理完成結果
def create_process(tasks, results, cpu_count):
for _ in range(cpu_count):
p = multiprocessing.Process(target=_worker, args=(tasks, results)) #根據_worker創建對應的進程
p.daemon = True #讓所有進程可以隨主進程結束而結束
p.start() #啟動
def _worker(tasks, results):
while True: #因為前面所有線程都設置了daemon=True,故不會無限循環
try:
task = tasks.get() #如果tasks中沒有任務,則阻塞
result = _download(task)
results.put(result) #some exceptions do not handled
finally:
tasks.task_done()
def add_tasks(tasks):
for url in get_urls(): #get_urls() return a urls_list
tasks.put(url)
def parse(tasks, results):
try:
tasks.join()
except KeyboardInterrupt as err:
print "Tasks has been stopped!"
print err
while not results.empty():
_parse(results)
if __name__ == '__main__':
main()
利用Python3中的concurrent.futures包
在Python3中可以利用concurrent.futures包,編寫更加簡單易用的多線程/多進程代碼.其使用感覺和Java的concurrent框架很相似(借鑒?)
比如下面的簡單代碼示例
def handler():
futures = set()
with concurrent.futures.ProcessPoolExecutor(max_workers=cpu_count) as executor:
for task in get_task(tasks):
future = executor.submit(task)
futures.add(future)
def wait_for(futures):
try:
for future in concurrent.futures.as_completed(futures):
err = futures.exception()
if not err:
result = future.result()
else:
raise err
except KeyboardInterrupt as e:
for future in futures:
future.cancel()
print "Task has been canceled!"
print e
return result
總結
要是一些大型Python項目也這般編寫,那麼效率也太低了.在Python中有許多已有的框架使用,使用它們起來更加高效.
但是自己的一些"小打小鬧"的程序這樣來編寫還是不錯的.:)
Ⅱ Python幾種並發實現方案的性能比較
1.目前推薦使用gevent吧,當然,結合起python的jit技術實現,可突破gil的限制,pypy還是可觀的,內存上可能比cpython佔用大些
Ⅲ python用例並發怎麼解決
python-selenium並發執行測試用例(方法一 各模塊每一條並發執行)
總執行代碼:
# coding=utf-8
import unittest,os,time
import HTMLTestRunner
import threading
import sys
sys.path.append('C:/Users/Dell/Desktop/CARE/program')#使用編輯器,要指定當前目錄,不然無法執行第20行代碼
def creatsuite():
casedir = []
list = os.listdir(os.path.dirname(os.getcwd()))#獲取當前路徑的上一級目錄的所有文件夾,這里可以改成絕對路徑(要搜索的文件路徑)
for xx in list:
if "" in xx:
casedir.append(xx)
suite =[]
for n in casedir:
testunit = unittest.TestSuite()
unittest.defaultTestLoader._top_level_dir = None
#(unittest.defaultTestLoader(): defaultTestLoader()類,通過該類下面的discover()方法可自動更具測試目錄start_dir匹配查找測試用例文件(test*.py),
並將查找到的測試用例組裝到測試套件,因此可以直接通過run()方法執行discover)
discover = unittest.defaultTestLoader.discover(str(n),pattern='tet_*.py',top_level_dir=None)
for test_suite in discover:
for test_case in test_suite:
testunit.addTests(test_case)
suite.append(testunit)
return suite, casedir
def runcase(suite,casedir):
lastPath = os.path.dirname(os.getcwd())#獲取當前路徑的上一級
resultDir = lastPath+"\\run\\report\\" #報告存放路徑
now = time.strftime("%Y-%m-%d %H.%M.%S",time.localtime())
filename = resultDir + now +" result.html"
fp = file(filename, 'wb')
proclist=[]
s=0
for i in suite:
runner = HTMLTestRunner.HTMLTestRunner(stream=fp,title=str(casedir[s])+u'測試報告',description=u'用例執行情況:')
proc = threading.Thread(target=runner.run,args=(i,))
proclist.append(proc)
s=s+1
for proc in proclist:
proc.start()
for proc in proclist:
proc.join()
fp.close()
if __name__ == "__main__":
runtmp=creatsuite()
runcase(runtmp[0],runtmp[1])
Ⅳ 有沒有人寫過python的一秒3000個並發請求的服務端
有個國產的web框架:eurasia, 業界大牛 沈老大 寫的,貌似用了stackless python
自己寫了一個tcp的server,epoll based,測過並發60k,不是web框架,沒測過每秒請求數。
python搭tcp/web server都很快,現成的web框架多,裸tcp的框架少
Ⅳ 如何用Python一門語言通吃高性能並發,GPU計算和深度學習
第一個就是並發本身所帶來的開銷即新開處理線程、關閉處理線程、多個處理線程時間片輪轉所帶來的開銷。
實際上對於一些邏輯不那麼復雜的場景來說這些開銷甚至比真正的處理邏輯部分代碼的開銷更大。所以我們決定採用基於協程的並發方式,即服務進程只有一個(單cpu)所有的請求數據都由這個服務進程內部來維護,同時服務進程自行調度不同請求的處理順序,這樣避免了傳統多線程並發方式新建、銷毀以及系統調度處理線程的開銷。基於這樣的考慮我們選擇了基於Tornado框架實現api服務的開發。Tornado的實現非常簡潔明了,使用python的生成器作為協程,利用IOLoop實現了調度隊列。
第二個問題是資料庫的性能,這里說的資料庫包括MongoDB和Redis,我這里分開講。
先講MongoDB的問題,MongoDB主要存儲不同的用戶對於驗證的不同設置,比如該顯示什麼樣的圖片。
一開始每次驗證請求都會查詢MongoDB,當時我們的MongoDB是純內存的,同時三台機器組成一個復制集,這樣的組合大概能穩定承載八九千的qps,後來隨著我們驗證量越來越大,這個承載能力逐漸就成為了我們的瓶頸。
為了徹底搞定這個問題,我們提出了最極端的解決方案,乾脆直接把資料庫中的數據完全緩存到服務進程里定期批量更新,這樣查詢的開銷將大大降低。但是因為我們用的是Python,由於GIL的存在,在8核伺服器上會fork出來8個服務進程,進程之間不像線程那麼方便,所以我們基於mmap自己寫了一套夥伴演算法構建了一個跨進程共享緩存。自從這套緩存上線之後,Mongodb的負載幾乎變成了零。
說完了MongoDB再說Redis的問題,Redis代碼簡潔、數據結構豐富、性能強大,唯一的問題是作為一個單進程程序,終究性能是有上限的。
雖然今年Redis發布了官方的集群版本,但是經過我們的測試,認為這套分布式方案的故障恢復時間不夠優秀並且運維成本較高。在Redis官方集群方案面世之前,開源世界有不少proxy方案,比如Twtter的TwemProxy和豌豆莢的Codis。這兩種方案測試完之後給我們的感覺TwemProxy運維還是比較麻煩,Codis使用起來讓人非常心曠神怡,無論是修改配置還是擴容都可以在配置頁面上完成,並且性能也還算不錯,但無奈當時Codis還有比較嚴重的BUG只能放棄之。
幾乎嘗試過各種方案之後,我們還是下決心自己實現一套分布式方案,目的是高度貼合我們的需求並且運維成本要低、擴容要方便、故障切換要快最重要的是數據冗餘一定要做好。
基於上面的考慮,我們確定基於客戶端的分布式方案,通過zookeeper來同步狀態保證高可用。具體來說,我們修改Redis源碼,使其向zookeeper注冊,客戶端由zookeeper上獲取Redis伺服器集群信息並根據統一的一致性哈希演算法來計算數據應該存儲在哪台Redis上,並在哈希環的下一台Redis上寫入一份冗餘數據,當讀取原始數據失敗時可以立即嘗試讀取冗餘數據而不會造成服務中斷。
Ⅵ 高並發,用Python適合嗎
Python不太適合高並發,雖然可以做,但是問題還是比較大,特別如果是後端服務,需要很高的高並發的話,還是用其他語言。
要高並發的話, 多進程+協程的組合的並發性能遠高於多線程。我在這篇文章中對python的並發方案有過比較。 像是要發各種請求的,其實和爬蟲類似, 協程的方案比較合適,能達到很高的並發。
Python簡介:
Python由荷蘭數學和計算機科學研究學會的Guido van Rossum於1990 年代初設計,作為一門叫做ABC語言的替代品。
Python提供了高效的高級數據結構,還能簡單有效地面向對象編程。Python語法和動態類型,以及解釋型語言的本質,使它成為多數平台上寫腳本和快速開發應用的編程語言,隨著版本的不斷更新和語言新功能的添加,逐漸被用於獨立的、大型項目的開發。
Ⅶ python現在做高並發伺服器 性能怎麼樣
你要相信一點,現在伺服器的瓶頸主要不在語言,而是磁碟IO,網路IO,業務邏輯等等。
對於幾乎所有現代語言,對C10K問題都能比較好的解決。
HTTP/2、非同步、協程、RESTful等等技術都在一定程度幫我們處理C10K問題,Python世界也有很多開源庫幫我們解決這些問題(換成Java也差不多)。
我公司目前使用的方案有:使用Nginx支持HTTP/2,實現簡單負載均衡,使用Python Tornado + RabbitMQ非同步處理耗時任務,但應用主體還是基於Python FlaskRESTful。
也許使用Java或Go可以提升性能,但我們看中的是Python的工程型、可讀性、可維護性,適合快速迭代開發。
Ⅷ python高並發怎麼解決
某個時間段內,數據涌來,這就是並發。如果數據量很大,就是高並發
高並發的解決方法:
1、隊列、緩沖區
假設只有一個窗口,陸續湧入食堂的人,排隊打菜是比較好的方式
所以,排隊(隊列)是一種天然解決並發的辦法
排隊就是把人排成 隊列,先進先出,解決了資源使用的問題
排成的隊列,其實就是一個緩沖地帶,就是 緩沖區
假設女生優先,每次都從這個隊伍中優先選出女生出來先打飯,這就是 優先隊列
例如queue模塊的類Queue、LifoQueue、PriorityQueue(小頂堆實現)
2、爭搶
只開一個窗口,有可能沒有秩序,也就是誰擠進去就給誰打飯
擠到窗口的人占據窗口,直到打到飯菜離開
其他人繼續爭搶,會有一個人占據著窗口,可以視為鎖定窗口,窗口就不能為其他人提供服務了。
這是一種鎖機制
誰搶到資源就上鎖,排他性的鎖,其他人只能等候
爭搶也是一種高並發解決方案,但是,這樣可能不好,因為有可能有人很長時間搶不到
3、預處理
如果排長隊的原因,是由於每個人打菜等候時間長,因為要吃的菜沒有,需要現做,沒打著飯不走開,鎖定著窗口
食堂可以提前統計大多數人最愛吃的菜品,將最愛吃的80%的熱門菜,提前做好,保證供應,20%的冷門菜,現做
這樣大多數人,就算鎖定窗口,也很快打到飯菜走了,快速釋放窗口
一種提前載入用戶需要的數據的思路,預處理 思想,緩存常用
更多Python知識,請關註:Python自學網!!
Ⅸ 如何使用Python實現並發編程
多線程幾乎是每一個程序猿在使用每一種語言時都會首先想到用於解決並發的工具(JS程序員請迴避),使用多線程可以有效的利用CPU資源(Python例外)。然而多線程所帶來的程序的復雜度也不可避免,尤其是對競爭資源的同步問題。
然而在python中由於使用了全局解釋鎖(GIL)的原因,代碼並不能同時在多核上並發的運行,也就是說,Python的多線程不能並發,很多人會發現使用多線程來改進自己的Python代碼後,程序的運行效率卻下降了,這是多麼蛋疼的一件事呀!如果想了解更多細節,推薦閱讀這篇文章。實際上使用多線程的編程模型是很困難的,程序員很容易犯錯,這並不是程序員的錯誤,因為並行思維是反人類的,我們大多數人的思維是串列(精神分裂不討論),而且馮諾依曼設計的計算機架構也是以順序執行為基礎的。所以如果你總是不能把你的多線程程序搞定,恭喜你,你是個思維正常的程序猿:)
Python提供兩組線程的介面,一組是thread模塊,提供基礎的,低等級(Low Level)介面,使用Function作為線程的運行體。還有一組是threading模塊,提供更容易使用的基於對象的介面(類似於Java),可以繼承Thread對象來實現線程,還提供了其它一些線程相關的對象,例如Timer,Lock
使用thread模塊的例子
import thread
def worker():
"""thread worker function"""
print 'Worker'
thread.start_new_thread(worker)
使用threading模塊的例子
import threading
def worker():
"""thread worker function"""
print 'Worker'
t = threading.Thread(target=worker)
t.start()
或者Java Style
import threading
class worker(threading.Thread):
def __init__(self):
pass
def run():
"""thread worker function"""
print 'Worker'
t = worker()
t.start()