導航:首頁 > 編程語言 > python函數任務隊列

python函數任務隊列

發布時間:2024-06-30 22:50:11

1. python實現簡單多線程任務隊列

Python實現簡單多線程任務隊列
最近我在用梯度下降演算法繪制神經網路的數據時,遇到了一些演算法性能的問題。梯度下降演算法的代碼如下(偽代碼):
defgradient_descent(): # the gradient descent code plotly.write(X, Y)
一般來說,當網路請求 plot.ly 繪圖時會阻塞等待返回,於是也會影響到其他的梯度下降函數的執行速度。
一種解決辦法是每調用一次 plotly.write 函數就開啟一個新的線程,但是這種方法感覺不是很好。 我不想用一個像 cerely(一種分布式任務隊列)一樣大而全的任務隊列框架,因為框架對於我的這點需求來說太重了,並且我的繪圖也並不需要 redis 來持久化數據。
那用什麼辦法解決呢?我在 python 中寫了一個很小的任務隊列,它可以在一個單獨的線程中調用 plotly.write函數。下面是程序代碼。
classTaskQueue(Queue.Queue):
首先我們繼承 Queue.Queue 類。從 Queue.Queue 類可以繼承 get 和 put 方法,以及隊列的行為。
def__init__(self, num_workers=1): Queue.Queue.__init__(self) self.num_workers=num_workers self.start_workers()
初始化的時候,我們可以不用考慮工作線程的數量。
defadd_task(self, task,*args,**kwargs): args=argsor() kwargs=kwargsor{} self.put((task, args, kwargs))
我們把 task, args, kwargs 以元組的形式存儲在隊列中。*args 可以傳遞數量不等的參數,**kwargs 可以傳遞命名參數。
defstart_workers(self): foriinrange(self.num_workers): t=Thread(target=self.worker) t.daemon=True t.start()
我們為每個 worker 創建一個線程,然後在後台刪除。
下面是 worker 函數的代碼:
defworker(self): whileTrue: tupl=self.get() item, args, kwargs=self.get() item(*args,**kwargs) self.task_done()
worker 函數獲取隊列頂端的任務,並根據輸入參數運行,除此之外,沒有其他的功能。下面是隊列的代碼:
我們可以通過下面的代碼測試:
defblokkah(*args,**kwargs): time.sleep(5) print「Blokkah mofo!」 q=TaskQueue(num_workers=5) foriteminrange(1): q.add_task(blokkah) q.join()# wait for all the tasks to finish. print「Alldone!」
Blokkah 是我們要做的任務名稱。隊列已經緩存在內存中,並且沒有執行很多任務。下面的步驟是把主隊列當做單獨的進程來運行,這樣主程序退出以及執行資料庫持久化時,隊列任務不會停止運行。但是這個例子很好地展示了如何從一個很簡單的小任務寫成像工作隊列這樣復雜的程序。
defgradient_descent(): # the gradient descent code queue.add_task(plotly.write, x=X, y=Y)
修改之後,我的梯度下降演算法工作效率似乎更高了。如果你很感興趣的話,可以參考下面的代碼。 classTaskQueue(Queue.Queue): def__init__(self, num_workers=1):Queue.Queue.__init__(self)self.num_workers=num_workersself.start_workers() defadd_task(self, task,*args,**kwargs):args=argsor()kwargs=kwargsor{}self.put((task, args, kwargs)) defstart_workers(self):foriinrange(self.num_workers):t=Thread(target=self.worker)t.daemon=Truet.start() defworker(self):whileTrue:tupl=self.get()item, args, kwargs=self.get()item(*args,**kwargs)self.task_done() deftests():defblokkah(*args,**kwargs):time.sleep(5)print"Blokkah mofo!" q=TaskQueue(num_workers=5) foriteminrange(10):q.add_task(blokkah) q.join()# block until all tasks are doneprint"All done!" if__name__=="__main__":tests()

2. Python 隊列queue與多線程組合(生產者+消費者模式)

在線程世界⾥,⽣產者就是⽣產數據的線程,消費者就是消費數據的線程。在多線程開發當中,如果⽣產者處理速度很快,⽽消費者處理速度很慢,那麼⽣產者就必須等待消費者處理完,才能繼續⽣產數據。同樣的道理,如果消費者的處理能⼒⼤於⽣產者,那麼消費者就必須等待⽣產者。為了解決這個問題於是引⼊了⽣產者和消費者模式。

⽣產者消費者模式是通過⼀個容器來解決⽣產者和消費者的強耦合問題。⽣產者和消費者彼此之間不直接通訊,⽽通過阻塞隊列來進⾏通訊,所以⽣產者⽣產完數據之後不⽤等待消費者處理,直接扔給阻塞隊列,消費者不找⽣產者要數據,⽽是唯租直接從阻塞隊列⾥取,阻塞隊列就相當於⼀個緩沖區,平衡了⽣產者和消費者的處理能⼒。

比如,對於同時爬取多個網頁的多線程爬蟲,在某一時刻你可能無法保證他們在處理不同的網站,在某些時刻他們極有可能在處理相同的網站,這豈不浪費?為了解決這個問題,可以將不同網頁的url放在queue中,然後多個線程來讀取queue中的url進行解析處理,而queue只允許一次出一個,出一個少一個。相同網站上不同網頁的url通常有某種規律,比如某個欄位的數字加1,這種情況完全可以用這種模式,「生產者程序」負責根據規律把完整的url製作出來,再塞進queue裡面(如果queue滿了,則等待);「消費者程序(網頁解析程序)」從queue的後面答團挨個取出url進行解析(如果queue裡面是空的,則等待),即使是多線程也能保證每個線程得到的是不同的url。這個過程中,生產者和消費彼此互不幹涉。

下面以實例說明如何將queue與多線程相結合形成所謂的「 生產者+消費者 」模式,同時解決 多線程如何退出 的問題(注意下例中是「一個生產者+多個消費者」的形式,多生產者+多消費者的模式可在此基礎上進一步實現):

上述程序的過程如下圖:

注意
(1)上述程序中生產者插入queue的時間間隔為0.1s,而消費者的取出時間間隔為2s,顯然消費速度不如生產速度,一開始queue是空的,一段時間後queue就變滿了,輸出結果正說明了這一點。如果將兩個時間調換,則結果相反,queue永遠不會滿,甚至只有1個值,因為只要進去就被消費了。
(2)消費者程序是通過「while」來推動不斷執行的,何時結束?上例中通過在queue中增加None的形式告訴消費者,生產者已經結束了,消費者也可以結束了。但消費者有多個,到底由哪個消費者得到None?為解決這個問題,上例中在消費者中先判斷當前取出的是不是None,如果是,則先在queue里插入一個None,然後再break當前這個消費者線程,最後的結果是所有的消費者線程都退出了,但queue中還剩下None沒有被取出。因此在程序的後面增加了一個for循環來挨個把queue中的元素取出,否則最後的q.join()將永遠阻塞,程序無法往下執行。
(3)程序中每一個q.get()後面都跟有一個q.task_done(),其作用指舉兆是從queue中取出一個元素就給q.join()發送一個信息,否則q.join()將永遠處於阻塞狀態,直到所有queue元素都被取出。

多線程「生產者-消費者」模式一般性結構圖

3. python多進程中隊列不空時阻塞,求解為什麼

最近接觸一個項目,要在多個虛擬機中運行任務,參考別人之前項目的代碼,採用了多進程來處理,於是上網查了查python中的多進程

一、先說說Queue(隊列對象)

Queue是python中的標准庫,可以直接import 引用,之前學習的時候有聽過著名的「先吃先拉」與「後吃先吐」,其實就是這里說的隊列,隊列的構造的時候可以定義它的容量,別吃撐了,吃多了,就會報錯,構造的時候不寫或者寫個小於1的數則表示無限多

import Queue

q = Queue.Queue(10)

向隊列中放值(put)

q.put(『yang')

q.put(4)

q.put([『yan','xing'])

在隊列中取值get()

默認的隊列是先進先出的

>>> q.get()
『yang'
>>> q.get()
4
>>> q.get()
[『yan', 『xing']

當一個隊列為空的時候如果再用get取則會堵塞,所以取隊列的時候一般是用到

get_nowait()方法,這種方法在向一個空隊列取值的時候會拋一個Empty異常

所以更常用的方法是先判斷一個隊列是否為空,如果不為空則取值

隊列中常用的方法

Queue.qsize() 返回隊列的大小
Queue.empty() 如果隊列為空,返回True,反之False
Queue.full() 如果隊列滿了,返回True,反之False
Queue.get([block[, timeout]]) 獲取隊列,timeout等待時間
Queue.get_nowait() 相當Queue.get(False)
非阻塞 Queue.put(item) 寫入隊列,timeout等待時間
Queue.put_nowait(item) 相當Queue.put(item, False)

二、multiprocessing中使用子進程概念

from multiprocessing import Process

可以通過Process來構造一個子進程

p = Process(target=fun,args=(args))

再通過p.start()來啟動子進程

再通過p.join()方法來使得子進程運行結束後再執行父進程

from multiprocessing import Process
import os

# 子進程要執行的代碼
def run_proc(name):
print 'Run child process %s (%s)...' % (name, os.getpid())

if __name__=='__main__':
print 'Parent process %s.' % os.getpid()
p = Process(target=run_proc, args=('test',))
print 'Process will start.'
p.start()
p.join()
print 'Process end.'

上面的程序運行後的結果其實是按照上圖中1,2,3分開進行的,先列印1,3秒後列印2,再3秒後列印3

代碼中的p.close()是關掉進程池子,是不再向裡面添加進程了,對Pool對象調用join()方法會等待所有子進程執行完畢,調用join()之前必須先調用close(),調用close()之後就不能繼續添加新的Process了。

當時也可以是實例pool的時候給它定義一個進程的多少

如果上面的代碼中p=Pool(5)那麼所有的子進程就可以同時進行

三、多個子進程間的通信

多個子進程間的通信就要採用第一步中說到的Queue,比如有以下的需求,一個子進程向隊列中寫數據,另外一個進程從隊列中取數據,

#coding:gbk

from multiprocessing import Process, Queue
import os, time, random

# 寫數據進程執行的代碼:
def write(q):
for value in ['A', 'B', 'C']:
print 'Put %s to queue...' % value
q.put(value)
time.sleep(random.random())

# 讀數據進程執行的代碼:
def read(q):
while True:
if not q.empty():
value = q.get(True)
print 'Get %s from queue.' % value
time.sleep(random.random())
else:
break

if __name__=='__main__':
# 父進程創建Queue,並傳給各個子進程:
q = Queue()
pw = Process(target=write, args=(q,))
pr = Process(target=read, args=(q,))
# 啟動子進程pw,寫入:
pw.start()
# 等待pw結束:
pw.join()
# 啟動子進程pr,讀取:
pr.start()
pr.join()
# pr進程里是死循環,無法等待其結束,只能強行終止:
print
print '所有數據都寫入並且讀完'

四、關於上面代碼的幾個有趣的問題

if __name__=='__main__':
# 父進程創建Queue,並傳給各個子進程:
q = Queue()
p = Pool()
pw = p.apply_async(write,args=(q,))
pr = p.apply_async(read,args=(q,))
p.close()
p.join()

print
print '所有數據都寫入並且讀完'

如果main函數寫成上面的樣本,本來我想要的是將會得到一個隊列,將其作為參數傳入進程池子里的每個子進程,但是卻得到

RuntimeError: Queue objects should only be shared between processes through inheritance

的錯誤,查了下,大意是隊列對象不能在父進程與子進程間通信,這個如果想要使用進程池中使用隊列則要使用multiprocess的Manager類

if __name__=='__main__':
manager = multiprocessing.Manager()
# 父進程創建Queue,並傳給各個子進程:
q = manager.Queue()
p = Pool()
pw = p.apply_async(write,args=(q,))
time.sleep(0.5)
pr = p.apply_async(read,args=(q,))
p.close()
p.join()

print
print '所有數據都寫入並且讀完'

這樣這個隊列對象就可以在父進程與子進程間通信,不用池則不需要Manager,以後再擴展multiprocess中的Manager類吧

關於鎖的應用,在不同程序間如果有同時對同一個隊列操作的時候,為了避免錯誤,可以在某個函數操作隊列的時候給它加把鎖,這樣在同一個時間內則只能有一個子進程對隊列進行操作,鎖也要在manager對象中的鎖

#coding:gbk

from multiprocessing import Process,Queue,Pool
import multiprocessing
import os, time, random

# 寫數據進程執行的代碼:
def write(q,lock):
lock.acquire() #加上鎖
for value in ['A', 'B', 'C']:
print 'Put %s to queue...' % value
q.put(value)
lock.release() #釋放鎖

# 讀數據進程執行的代碼:
def read(q):
while True:
if not q.empty():
value = q.get(False)
print 'Get %s from queue.' % value
time.sleep(random.random())
else:
break

if __name__=='__main__':
manager = multiprocessing.Manager()
# 父進程創建Queue,並傳給各個子進程:
q = manager.Queue()
lock = manager.Lock() #初始化一把鎖
p = Pool()
pw = p.apply_async(write,args=(q,lock))
pr = p.apply_async(read,args=(q,))
p.close()
p.join()

print
print '所有數據都寫入並且讀完'

4. Python 非同步任務隊列Celery 使用

在 Python 中定義 Celery 的時候,我們要引入 Broker,中文翻譯過來就是「中間人」的意思。在工頭(生產者)提出任務的時候,把所有的任務放到 Broker 裡面,在 Broker 的另外一頭,一群碼農(消費者)等著取出一個個任務准備著手做。這種模式註定了整個系統會是個開環系統,工頭對於碼農們把任務做的怎樣是不知情的。所以我們要引入 Backend 來保存每次任務的結果。這個 Backend 也是存儲任務的信息用的,只不過這里存的是那些任務的返回結果。我們可以選擇只讓錯誤執行的任務返回結果到 Backend,這樣我們取回結果,便可以知道有多少任務執行失敗了。

其實現架構如下圖所示:

可以看到,Celery 主要包含以下幾個模塊:

celery可以通過pip自動安裝。

broker 可選擇使用RabbitMQ/redis,backend可選擇使用RabbitMQ/redis/MongoDB。RabbitMQ/redis/mongoDB的安裝請參考對應的官方文檔。

------------------------------rabbitmq相關----------------------------------------------------------

官網安裝方法: http://www.rabbitmq.com/install-windows.html

啟動管理插件:sbin/rabbitmq-plugins enable rabbitmq_management 啟動rabbitmq:sbin/rabbitmq-server -detached

rabbitmq已經啟動,可以打開頁面來看看 地址: http://localhost:15672/#/

用戶名密碼都是guest 。進入可以看到具體頁面。 關於rabbitmq的配置,網上很多 自己去搜以下就ok了。

------------------------------rabbitmq相關--------------------------------------------------------

項目結構如下:

使用前,需要三個方面:celery配置,celery實例,需執行的任務函數,如下:

Celery 的配置比較多,可以在 官方配置文檔: http://docs.celeryproject.org/en/latest/userguide/configuration.html 查詢每個配置項的含義。

當然,要保證上述非同步任務and下述定時任務都能正常執行,就需要先啟動celery worker,啟動命令行如下:

啟動beat ,執行定時任務時, Celery會通過celery beat進程來完成。Celery beat會保持運行, 一旦到了某一定時任務需要執行時, Celery beat便將其加入到queue中. 不像worker進程, Celery beat只需要一個即可。而且為了避免有重復的任務被發送出去,所以Celery beat僅能有一個。

命令行啟動:

如果你想將celery worker/beat要放到後台運行,推薦可以扔給supervisor。

supervisor.conf如下:

5. 璋堣皥python閲岄潰鍏充簬浠誨姟闃熷垪

瑕佸洖絳旇繖涓闂棰樻垜浠棣栧厛鐪嬬湅鍦ㄦ祦姘寸嚎涓婄殑妗堝垪錛屽傛灉浜虹殑閫熷害寰堟參錛屾満鍣ㄧ殑閫熷害姣斾漢鐨勯熷害蹇寰堝氾紝灝變細閫犳垚錛屾満鍣ㄧ敓浜х殑涓滆タ娌℃湁鍙婃椂澶勭悊錛岃秺縐瓚婂氾紝閫犳垚闃誨烇紝褰卞搷鐢熶駭銆

鎵撲釜姣旀柟濡傛灉鍑虹幇浜虹殑閫熷害璺熶笉涓婃満鍣ㄩ熷害鎬庝箞鍔烇紝榪欎釜鏃跺欐垜浠灝遍渶瑕佺涓夋柟錛岀洃綆′漢鍛橈紙浠誨姟闃熷垪錛夋妸鏈哄櫒鐢熶駭鐨勪笢瑗匡紝鏀懼湪涓涓鍦版柟錛岋紙闃熷垪錛夛紝鐒跺悗鍒嗛厤緇欐瘡涓鐢ㄦ埛錛屾湁鏉′笉鐞嗙殑鎵ц屻

娑堟伅闃熷垪
娑堟伅闃熷垪鐨勮緭鍏ユ槸宸ヤ綔鐨勪竴涓鍗曞厓錛岀О涓轟換鍔★紝鐙絝嬬殑鑱岀▼錛圵orker錛夎繘紼嬫寔緇鐩戣嗛槦鍒椾腑鏄鍚︽湁闇瑕佸勭悊鐨勬柊浠誨姟銆
Celery 鐢ㄦ秷鎮閫氫俊錛岄氬父浣跨敤涓闂翠漢錛圔roker錛夊湪瀹㈡埛絝鍜岃亴紼嬮棿鏂℃棆銆傝繖涓榪囩▼浠庡㈡埛絝鍚戦槦鍒楁坊鍔犳秷鎮寮濮嬶紝涔嬪悗涓闂翠漢鎶婃秷鎮媧鵑佺粰鑱岀▼錛岃亴紼嬪規秷鎮榪涜屽勭悊銆傚備笅鍥炬墍紺猴細

鐜澧

浠誨姟鑴氭湰

flower github

鍦234 涓奻lower.py 鐨勮剼鏈

6. python實現堆棧與隊列的方法

python實現堆棧與隊列的方法
本文實例講述了python實現堆棧與隊列的方法。分享給大家供大家參考。具體分析如下:
1、python實現堆棧,可先將Stack類寫入文件stack.py,在其它程序文件中使用from stack import Stack,然後就可以使用堆棧了。
stack.py的程序:
代碼如下:class Stack():
def __init__(self,size):
self.size=size;
self.stack=[];
self.top=-1;
def push(self,ele): #入棧之前檢查棧是否已滿
if self.isfull():
raise exception("out of range");
else:
self.stack.append(ele);
self.top=self.top+1;
def pop(self): # 出棧之前檢查棧是否為空
if self.isempty():
raise exception("stack is empty");
else:
self.top=self.top-1;
return self.stack.pop();

def isfull(self):
return self.top+1==self.size;
def isempty(self):
return self.top==-1;
再寫一個程序文件,stacktest.py,使用棧,內容如下:
代碼如下:#!/usr/bin/python
from stack import Stack
s=Stack(20);
for i in range(3):
s.push(i);
s.pop()
print s.isempty();

2、python 實現隊列:
復制代碼代碼如下:class Queue():
def __init__(self,size):
self.size=size;
self.front=-1;
self.rear=-1;
self.queue=[];
def enqueue(self,ele): #入隊操作
if self.isfull():
raise exception("queue is full");
else:
self.queue.append(ele);
self.rear=self.rear+1;
def dequeue(self): #出隊操作
if self.isempty():
raise exception("queue is empty");
else:
self.front=self.front+1;
return self.queue[self.front];
def isfull(self):
return self.rear-self.front+1==self.size;
def isempty(self):
return self.front==self.rear;

q=Queue(10);
for i in range(3):
q.enqueue(i);
print q.dequeue();
print q.isempty();
希望本文所述對大家的Python程序設計有所幫助。

7. python生成多個隊列

q=[]
foriinrange(9):
q.append(Queue())

閱讀全文

與python函數任務隊列相關的資料

熱點內容
spring如何添加app 瀏覽:664
python循環import 瀏覽:552
怎樣把js代碼加密 瀏覽:800
frp伺服器百度雲 瀏覽:792
12306演算法 瀏覽:630
單片機驅動小馬達 瀏覽:100
pythoncookbook27 瀏覽:518
c的指針和python 瀏覽:186
python寫sftp 瀏覽:957
讀文pdf 瀏覽:507
pythonnumpy內積 瀏覽:782
linux硬碟模式 瀏覽:15
怎麼查安卓的空間 瀏覽:589
linux命令復制命令 瀏覽:115
勞動法裡面有沒有帶工資演算法的 瀏覽:456
如何在u盤里拷解壓軟體 瀏覽:689
oracle資料庫登陸命令 瀏覽:616
python自動化運維之路 瀏覽:402
eclipsejava教程下載 瀏覽:989
tita搜索app怎麼配置 瀏覽:265