『壹』 python實現簡單多線程任務隊列
Python實現簡單多線程任務隊列
最近我在用梯度下降演算法繪制神經網路的數據時,遇到了一些演算法性能的問題。梯度下降演算法的代碼如下(偽代碼):
defgradient_descent(): # the gradient descent code plotly.write(X, Y)
一般來說,當網路請求 plot.ly 繪圖時會阻塞等待返回,於是也會影響到其他的梯度下降函數的執行速度。
一種解決辦法是每調用一次 plotly.write 函數就開啟一個新的線程,但是這種方法感覺不是很好。 我不想用一個像 cerely(一種分布式任務隊列)一樣大而全的任務隊列框架,因為框架對於我的這點需求來說太重了,並且我的繪圖也並不需要 redis 來持久化數據。
那用什麼辦法解決呢?我在 python 中寫了一個很小的任務隊列,它可以在一個單獨的線程中調用 plotly.write函數。下面是程序代碼。
classTaskQueue(Queue.Queue):
首先我們繼承 Queue.Queue 類。從 Queue.Queue 類可以繼承 get 和 put 方法,以及隊列的行為。
def__init__(self, num_workers=1): Queue.Queue.__init__(self) self.num_workers=num_workers self.start_workers()
初始化的時候,我們可以不用考慮工作線程的數量。
defadd_task(self, task,*args,**kwargs): args=argsor() kwargs=kwargsor{} self.put((task, args, kwargs))
我們把 task, args, kwargs 以元組的形式存儲在隊列中。*args 可以傳遞數量不等的參數,**kwargs 可以傳遞命名參數。
defstart_workers(self): foriinrange(self.num_workers): t=Thread(target=self.worker) t.daemon=True t.start()
我們為每個 worker 創建一個線程,然後在後台刪除。
下面是 worker 函數的代碼:
defworker(self): whileTrue: tupl=self.get() item, args, kwargs=self.get() item(*args,**kwargs) self.task_done()
worker 函數獲取隊列頂端的任務,並根據輸入參數運行,除此之外,沒有其他的功能。下面是隊列的代碼:
我們可以通過下面的代碼測試:
defblokkah(*args,**kwargs): time.sleep(5) print「Blokkah mofo!」 q=TaskQueue(num_workers=5) foriteminrange(1): q.add_task(blokkah) q.join()# wait for all the tasks to finish. print「Alldone!」
Blokkah 是我們要做的任務名稱。隊列已經緩存在內存中,並且沒有執行很多任務。下面的步驟是把主隊列當做單獨的進程來運行,這樣主程序退出以及執行資料庫持久化時,隊列任務不會停止運行。但是這個例子很好地展示了如何從一個很簡單的小任務寫成像工作隊列這樣復雜的程序。
defgradient_descent(): # the gradient descent code queue.add_task(plotly.write, x=X, y=Y)
修改之後,我的梯度下降演算法工作效率似乎更高了。如果你很感興趣的話,可以參考下面的代碼。 classTaskQueue(Queue.Queue): def__init__(self, num_workers=1):Queue.Queue.__init__(self)self.num_workers=num_workersself.start_workers() defadd_task(self, task,*args,**kwargs):args=argsor()kwargs=kwargsor{}self.put((task, args, kwargs)) defstart_workers(self):foriinrange(self.num_workers):t=Thread(target=self.worker)t.daemon=Truet.start() defworker(self):whileTrue:tupl=self.get()item, args, kwargs=self.get()item(*args,**kwargs)self.task_done() deftests():defblokkah(*args,**kwargs):time.sleep(5)print"Blokkah mofo!" q=TaskQueue(num_workers=5) foriteminrange(10):q.add_task(blokkah) q.join()# block until all tasks are doneprint"All done!" if__name__=="__main__":tests()
『貳』 python實現堆棧與隊列的方法
python實現堆棧與隊列的方法
本文實例講述了python實現堆棧與隊列的方法。分享給大家供大家參考。具體分析如下:
1、python實現堆棧,可先將Stack類寫入文件stack.py,在其它程序文件中使用from stack import Stack,然後就可以使用堆棧了。
stack.py的程序:
代碼如下:class Stack():
def __init__(self,size):
self.size=size;
self.stack=[];
self.top=-1;
def push(self,ele): #入棧之前檢查棧是否已滿
if self.isfull():
raise exception("out of range");
else:
self.stack.append(ele);
self.top=self.top+1;
def pop(self): # 出棧之前檢查棧是否為空
if self.isempty():
raise exception("stack is empty");
else:
self.top=self.top-1;
return self.stack.pop();
def isfull(self):
return self.top+1==self.size;
def isempty(self):
return self.top==-1;
再寫一個程序文件,stacktest.py,使用棧,內容如下:
代碼如下:#!/usr/bin/python
from stack import Stack
s=Stack(20);
for i in range(3):
s.push(i);
s.pop()
print s.isempty();
2、python 實現隊列:
復制代碼代碼如下:class Queue():
def __init__(self,size):
self.size=size;
self.front=-1;
self.rear=-1;
self.queue=[];
def enqueue(self,ele): #入隊操作
if self.isfull():
raise exception("queue is full");
else:
self.queue.append(ele);
self.rear=self.rear+1;
def dequeue(self): #出隊操作
if self.isempty():
raise exception("queue is empty");
else:
self.front=self.front+1;
return self.queue[self.front];
def isfull(self):
return self.rear-self.front+1==self.size;
def isempty(self):
return self.front==self.rear;
q=Queue(10);
for i in range(3):
q.enqueue(i);
print q.dequeue();
print q.isempty();
希望本文所述對大家的Python程序設計有所幫助。
『叄』 python基礎(21)-線程通信
到這里,我們要聊一下線程通信的內容;
首先,我們拋開語言不談,先看看比較基礎的東西,線程間通信的方式;其實也就是哪幾種(我這里說的,是我的所謂的知道的。。。)事件,消息隊列,信號量,條件變數(鎖算不算?我只是認為是同步的一種);所以我們也就是要把這些掌握了,因為各有各的好處嘛;
條件變數我放到了上面的線程同步裡面講了,我總感覺這算是同步的一種,沒有很多具體信息的溝通;同時吧,我認為條件變數比較重要,因為這種可以應用於線程池的操作上;所以比較重要;這里,拋開條件變數不談,我們看看其他的東西;
1、消息隊列:
queue 模塊下提供了幾個阻塞隊列,這些隊列主要用於實現線程通信。在 queue 模塊下主要提供了三個類,分別代表三種隊列,它們的主要區別就在於進隊列、出隊列的不同。
關於這三個隊列類的簡單介紹如下:
queue.Queue(maxsize=0):代表 FIFO(先進先出)的常規隊列,maxsize 可以限制隊列的大小。如果隊列的大小達到隊列的上限,就會加鎖,再次加入元素時就會被阻塞,直到隊列中的元素被消費。如果將 maxsize 設置為 0 或負數,則該隊列的大小就是無限制的。
queue.LifoQueue(maxsize=0):代表 LIFO(後進先出)的隊列,與 Queue 的區別就是出隊列的順序不同。
PriorityQueue(maxsize=0):代表優先順序隊列,優先順序最小的元素先出隊列。
這三個隊列類的屬性和方法基本相同, 它們都提供了如下屬性和方法:
Queue.qsize():返回隊列的實際大小,也就是該隊列中包含幾個元素。
Queue.empty():判斷隊列是否為空。
Queue.full():判斷隊列是否已滿。
Queue.put(item, block=True, timeout=None):向隊列中放入元素。如果隊列己滿,且 block 參數為 True(阻塞),當前線程被阻塞,timeout 指定阻塞時間,如果將 timeout 設置為 None,則代表一直阻塞,直到該隊列的元素被消費;如果隊列己滿,且 block 參數為 False(不阻塞),則直接引發 queue.FULL 異常。
Queue.put_nowait(item):向隊列中放入元素,不阻塞。相當於在上一個方法中將 block 參數設置為 False。
Queue.get(item, block=True, timeout=None):從隊列中取出元素(消費元素)。如果隊列已滿,且 block 參數為 True(阻塞),當前線程被阻塞,timeout 指定阻塞時間,如果將 timeout 設置為 None,則代表一直阻塞,直到有元素被放入隊列中; 如果隊列己空,且 block 參數為 False(不阻塞),則直接引發 queue.EMPTY 異常。
Queue.get_nowait(item):從隊列中取出元素,不阻塞。相當於在上一個方法中將 block 參數設置為 False。
其實我們想想,這個隊列,是python進行封裝的,那麼我們可以用在線程間的通信;同時也是可以用做一個數據結構;先進先出就是隊列,後進先出就是棧;我們用這個棧寫個十進制轉二進制的例子:
沒毛病,可以正常的列印;其中需要注意的就是,maxsize在初始化的時候如果是0或者是個負數的話,那麼就會是不限制大小;
那麼其實我們想想,我們如果用做線程通信的話,我們兩個線程,可以把隊列設置為1的大小,如果是1對多,比如是創建者和消費者的關系,我們完全可以作為消息隊列,比如說創建者一直在創建一些東西,然後放入到消息隊列裡面,然後供消費著使用;就是一個很好的例子;所以,其實說是消息隊列,也就是隊列,沒差;
=====================================================================
下面來看一下事件
Event 是一種非常簡單的線程通信機制,一個線程發出一個 Event,另一個線程可通過該 Event 被觸發。
Event 本身管理一個內部旗標,程序可以通過 Event 的 set() 方法將該旗標設置為 True,也可以調用 clear() 方法將該旗標設置為 False。程序可以調用 wait() 方法來阻塞當前線程,直到 Event 的內部旗標被設置為 True。
Event 提供了如下方法:
is_set():該方法返回 Event 的內部旗標是否為True。
set():該方法將會把 Event 的內部旗標設置為 True,並喚醒所有處於等待狀態的線程。
clear():該方法將 Event 的內部旗標設置為 False,通常接下來會調用 wait() 方法來阻塞當前線程。
wait(timeout=None):該方法會阻塞當前線程。
這里我想解釋一下;其實對於事件來說,事件可以看成和條件變數是一樣的,只是我們說說不一樣的地方;
1、對於事件來說,一旦觸發了事件,也就是說,一旦set為true了,那麼就會一直為true,需要clear調內部的標志,才能繼續wait;但是conditon不是,他是一次性的喚醒其他線程;
2、conditon自己帶鎖;事件呢?不是的;沒有自己的鎖;比如說有一個存錢的線程,有一個是取錢的線程;那麼存錢的線程要存錢;需要怎麼辦呢?1、發現銀行沒有錢了(is_set判斷);2、鎖住銀行;3、存錢;4、釋放銀行;5、喚醒事件;對於取錢的人;1、判斷是否有錢;2、被喚醒了,然後鎖住銀行;3、開始取錢;4、清理告訴存錢的人,我沒錢了(clear);5、釋放鎖;6、等著錢存進去;
其實說白了,就是記住一點;這個旗標需要自己clear就對了
寫個例子,怕以後忘了怎麼用;
其實時間和信號量比較像;但是信號量不用自己清除標志位;但是事件是需要的;
『肆』 Python數據結構-隊列與廣度優先搜索(Queue)
隊列(Queue) :簡稱為隊,一種線性表數據結構,是一種只允許在表的一端進行插入操作,而在表的另一端進行刪除操作的線性表。
我們把隊列中允許插入的一端稱為 「隊尾(rear)」 ;把允許刪除的另一端稱為 「隊頭(front)」 。當表中沒有任何數據元素時,稱之為 「空隊」 。
廣度優先搜索演算法(Breadth First Search) :簡稱為 BFS,又譯作寬度優先搜索 / 橫向優先搜索。是一種用於遍歷或搜索樹或圖的演算法。該演算法從根節點開始,沿著樹的寬度遍歷樹或圖的節點。如果所有節點均被訪問,則演算法中止。
廣度優先遍歷 類似於樹的層次遍歷過程 。呈現出一層一層向外擴張的特點。先看到的節點先訪問,後看到的節點後訪問。遍歷到的節點順序符合「先進先出」的特點,所以廣度優先搜索可以通過「隊列」來實現。
力扣933
游戲時,隊首始終是持有土豆的人
模擬游戲開始,隊首的人出隊,之後再到隊尾(類似於循環隊列)
傳遞了num次之後,將隊首的人移除
如此反復,直到隊列中剩餘一人
多人共用一台列印機,採取「先到先服務」的隊列策略來執行列印任務
需要解決的問題:1 列印系統的容量是多少?2 在能夠接受的等待時間內,系統可容納多少用戶以多高的頻率提交列印任務?
輸入:abba
輸出:False
思路:1 先將需要判定的詞從隊尾加入 deque; 2從兩端同時移除字元並判斷是否相同,直到deque中剩餘0個(偶數)或1個字元(奇數)
內容參考: https://algo.itcharge.cn/04.%E9%98%9F%E5%88%97/01.%E9%98%9F%E5%88%97%E5%9F%BA%E7%A1%80%E7%9F%A5%E8%AF%86/01.%E9%98%9F%E5%88%97%E5%9F%BA%E7%A1%80%E7%9F%A5%E8%AF%86/
『伍』 python生成多個隊列
q=[]
foriinrange(9):
q.append(Queue())
『陸』 python中棧和隊列在功能上的區別
「棧」
和
「隊列」
是數據結構,與具體的語言無關。
1.隊列先進先出,棧先進後出。
2.
對插入和刪除操作的"限定"。
棧是限定只能在表的一端進行插入和刪除操作的線性表。
隊列是限定只能在表的一端進行插入和在另一端進行刪除操作的線性表。
從"數據結構"的角度看,它們都是線性結構,即數據元素之間的關系相同。但它們是完全不同的數據類型。除了它們各自的基本操作集不同外,主要區別是對插入和刪除操作的"限定"。
棧和隊列是在程序設計中被廣泛使用的兩種線性數據結構,它們的特點在於基本操作的特殊性,棧必須按"後進先出"的規則進行操作,而隊列必須按"先進先出"
的規則進行操作。和線性表相比,它們的插入和刪除操作受更多的約束和限定,故又稱為限定性的線性表結構。
3.遍歷數據速度不同。棧只能從頭部取數據
也就最先放入的需要遍歷整個棧最後才能取出來,而且在遍歷數據的時候還得為數據開辟臨時空間,保持數據在遍歷前的一致性隊列怎不同,他基於地址指針進行遍歷,而且可以從頭或尾部開始遍歷,但不能同時遍歷,無需開辟臨時空間,因為在遍歷的過程中不影像數據結構,速度要快的多
棧(stack)是限定只能在表的一端進行插入和刪除操作的線性表。
隊列(queue)是限定只能在表的一端進行插入和在另一端進行刪除操作的線性表。
從"數據結構"的角度看,它們都是線性結構,即數據元素之間的關系相同。但它們是完全不同的數據類型。除了它們各自的基本操作集不同外,主要區別是對插入和刪除操作的"限定"。
棧和隊列是在程序設計中被廣泛使用的兩種線性數據結構,它們的特點在於基本操作的特殊性,棧必須按"後進先出"的規則進行操作,而隊列必須按"先進先出"的規則進行操作。和線性表相比,它們的插入和刪除操作受更多的約束和限定,故又稱為限定性的線性表結構。
『柒』 Python 隊列queue與多線程組合(生產者+消費者模式)
在線程世界⾥,⽣產者就是⽣產數據的線程,消費者就是消費數據的線程。在多線程開發當中,如果⽣產者處理速度很快,⽽消費者處理速度很慢,那麼⽣產者就必須等待消費者處理完,才能繼續⽣產數據。同樣的道理,如果消費者的處理能⼒⼤於⽣產者,那麼消費者就必須等待⽣產者。為了解決這個問題於是引⼊了⽣產者和消費者模式。
⽣產者消費者模式是通過⼀個容器來解決⽣產者和消費者的強耦合問題。⽣產者和消費者彼此之間不直接通訊,⽽通過阻塞隊列來進⾏通訊,所以⽣產者⽣產完數據之後不⽤等待消費者處理,直接扔給阻塞隊列,消費者不找⽣產者要數據,⽽是唯租直接從阻塞隊列⾥取,阻塞隊列就相當於⼀個緩沖區,平衡了⽣產者和消費者的處理能⼒。
比如,對於同時爬取多個網頁的多線程爬蟲,在某一時刻你可能無法保證他們在處理不同的網站,在某些時刻他們極有可能在處理相同的網站,這豈不浪費?為了解決這個問題,可以將不同網頁的url放在queue中,然後多個線程來讀取queue中的url進行解析處理,而queue只允許一次出一個,出一個少一個。相同網站上不同網頁的url通常有某種規律,比如某個欄位的數字加1,這種情況完全可以用這種模式,「生產者程序」負責根據規律把完整的url製作出來,再塞進queue裡面(如果queue滿了,則等待);「消費者程序(網頁解析程序)」從queue的後面答團挨個取出url進行解析(如果queue裡面是空的,則等待),即使是多線程也能保證每個線程得到的是不同的url。這個過程中,生產者和消費彼此互不幹涉。
下面以實例說明如何將queue與多線程相結合形成所謂的「 生產者+消費者 」模式,同時解決 多線程如何退出 的問題(注意下例中是「一個生產者+多個消費者」的形式,多生產者+多消費者的模式可在此基礎上進一步實現):
上述程序的過程如下圖:
注意 :
(1)上述程序中生產者插入queue的時間間隔為0.1s,而消費者的取出時間間隔為2s,顯然消費速度不如生產速度,一開始queue是空的,一段時間後queue就變滿了,輸出結果正說明了這一點。如果將兩個時間調換,則結果相反,queue永遠不會滿,甚至只有1個值,因為只要進去就被消費了。
(2)消費者程序是通過「while」來推動不斷執行的,何時結束?上例中通過在queue中增加None的形式告訴消費者,生產者已經結束了,消費者也可以結束了。但消費者有多個,到底由哪個消費者得到None?為解決這個問題,上例中在消費者中先判斷當前取出的是不是None,如果是,則先在queue里插入一個None,然後再break當前這個消費者線程,最後的結果是所有的消費者線程都退出了,但queue中還剩下None沒有被取出。因此在程序的後面增加了一個for循環來挨個把queue中的元素取出,否則最後的q.join()將永遠阻塞,程序無法往下執行。
(3)程序中每一個q.get()後面都跟有一個q.task_done(),其作用指舉兆是從queue中取出一個元素就給q.join()發送一個信息,否則q.join()將永遠處於阻塞狀態,直到所有queue元素都被取出。
多線程「生產者-消費者」模式一般性結構圖