❶ python多線程的幾種方法
Python進階(二十六)-多線程實現同步的四種方式
臨界資源即那些一次只能被一個線程訪問的資源,典型例子就是列印機,它一次只能被一個程序用來執行列印功能,因為不能多個線程同時操作,而訪問這部分資源的代碼通常稱之為臨界區。
鎖機制
threading的Lock類,用該類的acquire函數進行加鎖,用realease函數進行解鎖
import threadingimport timeclass Num:
def __init__(self):
self.num = 0
self.lock = threading.Lock() def add(self):
self.lock.acquire()#加鎖,鎖住相應的資源
self.num += 1
num = self.num
self.lock.release()#解鎖,離開該資源
return num
n = Num()class jdThread(threading.Thread):
def __init__(self,item):
threading.Thread.__init__(self)
self.item = item def run(self):
time.sleep(2)
value = n.add()#將num加1,並輸出原來的數據和+1之後的數據
print(self.item,value)for item in range(5):
t = jdThread(item)
t.start()
t.join()#使線程一個一個執行
當一個線程調用鎖的acquire()方法獲得鎖時,鎖就進入「locked」狀態。每次只有一個線程可以獲得鎖。如果此時另一個線程試圖獲得這個鎖,該線程就會變為「blocked」狀態,稱為「同步阻塞」(參見多線程的基本概念)。
直到擁有鎖的線程調用鎖的release()方法釋放鎖之後,鎖進入「unlocked」狀態。線程調度程序從處於同步阻塞狀態的線程中選擇一個來獲得鎖,並使得該線程進入運行(running)狀態。
信號量
信號量也提供acquire方法和release方法,每當調用acquire方法的時候,如果內部計數器大於0,則將其減1,如果內部計數器等於0,則會阻塞該線程,知道有線程調用了release方法將內部計數器更新到大於1位置。
import threadingimport timeclass Num:
def __init__(self):
self.num = 0
self.sem = threading.Semaphore(value = 3) #允許最多三個線程同時訪問資源
def add(self):
self.sem.acquire()#內部計數器減1
self.num += 1
num = self.num
self.sem.release()#內部計數器加1
return num
n = Num()class jdThread(threading.Thread):
def __init__(self,item):
threading.Thread.__init__(self)
self.item = item def run(self):
time.sleep(2)
value = n.add()
print(self.item,value)for item in range(100):
❷ 如何多線程(多進程)加速while循環(語言-python)
import numpy as np
import os
import sys
import multiprocessing as mp
import time
def MCS(input_data, med):
#t1 = time.perf_counter()
left = 0
lp = 0
while True:
lp = lp + 1
data_pool = input_data + left
output_data = med * 0.05 * data_pool / (10000 + med)
output_data = np.where(output_data > data_pool, data_pool, output_data)
left = data_pool - output_data
cri = (input_data - output_data) / input_data * 100
#print(lp, data_pool, output_data, cri)
if cri <= 1:
break
t2 = time.perf_counter()
#print(f'Finished in {t2 - t1} seconds')
if __name__ == "__main__":
pool = mp.Pool(processes=5)
tasks = []
for i in np.linspace(0.4, 0.6, num = 10):
tasks.append([100, i])
t1 = time.perf_counter()
pool.starmap(MCS, tasks)
#pool.apply_async(MCS, args=(100, 0.4))
t2 = time.perf_counter()
#pool.join()
#pool.close()
for i in np.linspace(0.4, 0.6, num = 10):
MCS(100, i)
t3 = time.perf_counter()
print(f'Finished in {t2 - t1} seconds')
print(f'Finished in {t3 - t2} seconds')
原因可能是只運行了一個例子,
如圖測試了10個例子,測試結果如下
Finished in 15.062450630997773 seconds
Finished in 73.1936681799998 seconds
並行確實有一定的加速。
❸ Python爬蟲實戰,Python多線程抓取5千多部最新電影下載鏈接
利用Python多線程爬了5000多部最新電影下載鏈接,廢話不多說~
讓我們愉快地開始吧~
Python版本: 3.6.4
相關模塊:
requests模塊;
re模塊;
csv模塊;
以及一些Python自帶的模塊。
安裝Python並添加到環境變數,pip安裝需要的相關模塊即可。
拿到鏈接之後,接下來就是繼續訪問這些鏈接,然後拿到電影的下載鏈接
但是這里還是有很多的小細節,例如我們需要拿到電影的總頁數,其次這么多的頁面,一個線程不知道要跑到什麼時候,所以我們首先先拿到總頁碼,然後用多線程來進行任務的分配
我們首先先拿到總頁碼,然後用多線程來進行任務的分配
總頁數其實我們用re正則來獲取
爬取的內容存取到csv,也可以寫個函數來存取
開啟4個進程來下載鏈接
您學廢了嗎?最後祝大家天天進步!!學習Python最重要的就是心態。我們在學習過程中必然會遇到很多難題,可能自己想破腦袋都無法解決。這都是正常的,千萬別急著否定自己,懷疑自己。如果大家在剛開始學習中遇到困難,想找一個python學習交流環境,可以加入我們,領取學習資料,一起討論,會節約很多時間,減少很多遇到的難題。
❹ python循環怎麼用多線程去運行
背景:Python腳本:讀取文件中每行,放入列表中;循環讀取列表中的每個元素,並做處理操作。
核心:多線程處理單個for循環函數調用
模塊:threading
第一部分:
:多線程腳本 (該腳本只有兩個線程,t1循環次數<t2)#!/usr/bin/env python#-*- coding: utf8 -*- import sysimport timeimport stringimport threadingimport datetimefileinfo = sys.argv[1] # 讀取文件內容放入列表host_list = []port_list = [] # 定義函數:讀取文件內容放入列表中def CreateList(): f = file(fileinfo,'r') for line in f.readlines(): host_list.append(line.split(' ')[0]) port_list.append(line.split(' ')[1]) return host_list return port_list f.close() # 單線程 循環函數,注釋掉了#def CreateInfo(): # for i in range(0,len(host_list)): # 單線程:直接循環列表# time.sleep(1)# TimeMark = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')# print "The Server's HostName is %-15s and Port is %-4d !!! [%s]" % (host_list[i],int(port_list[i]),TimeMark)# # 定義多線程循環調用函數def MainRange(start,stop): #提供列表index起始位置參數 for i in range(start,stop): time.sleep(1) TimeMark = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') print "The Server's HostName is %-15s and Port is %-4d !!! [%s]" % (host_list[i],int(port_list[i]),TimeMark) # 執行函數,生成列表CreateList()# 列表分割成:兩部分 mid為列表的index中間位置mid = int(len(host_list)/2) # 多線程部分threads = []t1 = threading.Thread(target=MainRange,args=(0,mid))threads.append(t1)t2 = threading.Thread(target=MainRange,args=(mid,len(host_list)))threads.append(t2) for t in threads: t.setDaemon(True) t.start()t.join()print "ok"
以上是腳本內容!!!
----------------------------------------------------------------------
:讀取文件的內容
文件內容:
[root@monitor2 logdb]# cat hostinfo.txt
192.168.10.11 1011
192.168.10.12 1012
192.168.10.13 1013
192.168.10.14 1014
192.168.10.15 1015
192.168.10.16 1016
192.168.10.17 1017
192.168.10.18 1018
192.168.10.19 1019
192.168.10.20 1020
192.168.10.21 1021
192.168.10.22 1022
192.168.10.23 1023
192.168.10.24 1024
192.168.10.25 1025
:輸出結果:
單線程 : 執行腳本:輸出結果:
[root@monitor2 logdb]# ./Threadfor.py hostinfo.txt
The Server's HostName is 192.168.10.10 and Port is 1010 !!! [2017-01-10 14:25:14]
The Server's HostName is 192.168.10.11 and Port is 1011 !!! [2017-01-10 14:25:15]
The Server's HostName is 192.168.10.12 and Port is 1012 !!! [2017-01-10 14:25:16]
.
.
.
The Server's HostName is 192.168.10.25 and Port is 1025 !!! [2017-01-10 14:25:29]
多線程:執行腳本:輸出 結果
[root@monitor2 logdb]# ./Threadfor.py hostinfo.txt
The Server's HostName is 192.168.10.11 and Port is 1011 !!! [2017-01-10 14:51:51]
The Server's HostName is 192.168.10.18 and Port is 1018 !!! [2017-01-10 14:51:51]
The Server's HostName is 192.168.10.12 and Port is 1012 !!! [2017-01-10 14:51:52]
The Server's HostName is 192.168.10.19 and Port is 1019 !!! [2017-01-10 14:51:52]
The Server's HostName is 192.168.10.13 and Port is 1013 !!! [2017-01-10 14:51:53]
The Server's HostName is 192.168.10.20 and Port is 1020 !!! [2017-01-10 14:51:53]
The Server's HostName is 192.168.10.14 and Port is 1014 !!! [2017-01-10 14:51:54]
The Server's HostName is 192.168.10.21 and Port is 1021 !!! [2017-01-10 14:51:54]
The Server's HostName is 192.168.10.15 and Port is 1015 !!! [2017-01-10 14:51:55]
The Server's HostName is 192.168.10.22 and Port is 1022 !!! [2017-01-10 14:51:55]
The Server's HostName is 192.168.10.16 and Port is 1016 !!! [2017-01-10 14:51:56]
The Server's HostName is 192.168.10.23 and Port is 1023 !!! [2017-01-10 14:51:56]
The Server's HostName is 192.168.10.17 and Port is 1017 !!! [2017-01-10 14:51:57]
The Server's HostName is 192.168.10.24 and Port is 1024 !!! [2017-01-10 14:51:57]
The Server's HostName is 192.168.10.25 and Port is 1025 !!! [2017-01-10 14:51:58]
❺ python 怎麼多線程遍歷list
可以對第二個list的元素進行遍歷,檢查是否出現在第二個list當中,如果使用表理解,可以使用一行代碼完成任務。 list1 = [1,2,3,4,5] list2 = [4,5,6,7,8] print [l for l in list1 if l in list2] # [4,5] 如果每一個列表中均沒有重復的元素,...
❻ python 怎麼實現多線程的
線程也就是輕量級的進程,多線程允許一次執行多個線程,Python是多線程語言,它有一個多線程包,GIL也就是全局解釋器鎖,以確保一次執行單個線程,一個線程保存GIL並在將其傳遞給下一個線程之前執行一些操作,也就產生了並行執行的錯覺。
❼ Python實現簡單多線程任務隊列
Python實現簡單多線程任務隊列
最近我在用梯度下降演算法繪制神經網路的數據時,遇到了一些演算法性能的問題。梯度下降演算法的代碼如下(偽代碼):
defgradient_descent(): # the gradient descent code plotly.write(X, Y)
一般來說,當網路請求 plot.ly 繪圖時會阻塞等待返回,於是也會影響到其他的梯度下降函數的執行速度。
一種解決辦法是每調用一次 plotly.write 函數就開啟一個新的線程,但是這種方法感覺不是很好。 我不想用一個像 cerely(一種分布式任務隊列)一樣大而全的任務隊列框架,因為框架對於我的這點需求來說太重了,並且我的繪圖也並不需要 redis 來持久化數據。
那用什麼辦法解決呢?我在 python 中寫了一個很小的任務隊列,它可以在一個單獨的線程中調用 plotly.write函數。下面是程序代碼。
classTaskQueue(Queue.Queue):
首先我們繼承 Queue.Queue 類。從 Queue.Queue 類可以繼承 get 和 put 方法,以及隊列的行為。
def__init__(self, num_workers=1): Queue.Queue.__init__(self) self.num_workers=num_workers self.start_workers()
初始化的時候,我們可以不用考慮工作線程的數量。
defadd_task(self, task,*args,**kwargs): args=argsor() kwargs=kwargsor{} self.put((task, args, kwargs))
我們把 task, args, kwargs 以元組的形式存儲在隊列中。*args 可以傳遞數量不等的參數,**kwargs 可以傳遞命名參數。
defstart_workers(self): foriinrange(self.num_workers): t=Thread(target=self.worker) t.daemon=True t.start()
我們為每個 worker 創建一個線程,然後在後台刪除。
下面是 worker 函數的代碼:
defworker(self): whileTrue: tupl=self.get() item, args, kwargs=self.get() item(*args,**kwargs) self.task_done()
worker 函數獲取隊列頂端的任務,並根據輸入參數運行,除此之外,沒有其他的功能。下面是隊列的代碼:
我們可以通過下面的代碼測試:
defblokkah(*args,**kwargs): time.sleep(5) print「Blokkah mofo!」 q=TaskQueue(num_workers=5) foriteminrange(1): q.add_task(blokkah) q.join()# wait for all the tasks to finish. print「Alldone!」
Blokkah 是我們要做的任務名稱。隊列已經緩存在內存中,並且沒有執行很多任務。下面的步驟是把主隊列當做單獨的進程來運行,這樣主程序退出以及執行資料庫持久化時,隊列任務不會停止運行。但是這個例子很好地展示了如何從一個很簡單的小任務寫成像工作隊列這樣復雜的程序。
defgradient_descent(): # the gradient descent code queue.add_task(plotly.write, x=X, y=Y)
修改之後,我的梯度下降演算法工作效率似乎更高了。如果你很感興趣的話,可以參考下面的代碼。 classTaskQueue(Queue.Queue): def__init__(self, num_workers=1):Queue.Queue.__init__(self)self.num_workers=num_workersself.start_workers() defadd_task(self, task,*args,**kwargs):args=argsor()kwargs=kwargsor{}self.put((task, args, kwargs)) defstart_workers(self):foriinrange(self.num_workers):t=Thread(target=self.worker)t.daemon=Truet.start() defworker(self):whileTrue:tupl=self.get()item, args, kwargs=self.get()item(*args,**kwargs)self.task_done() deftests():defblokkah(*args,**kwargs):time.sleep(5)print"Blokkah mofo!" q=TaskQueue(num_workers=5) foriteminrange(10):q.add_task(blokkah) q.join()# block until all tasks are doneprint"All done!" if__name__=="__main__":tests()
❽ Python多線程總結
在實際處理數據時,因系統內存有限,我們不可能一次把所有數據都導出進行操作,所以需要批量導出依次操作。為了加快運行,我們會採用多線程的方法進行數據處理, 以下為我總結的多線程批量處理數據的模板:
主要分為三大部分:
共分4部分對多線程的內容進行總結。
先為大家介紹線程的相關概念:
在飛車程序中,如果沒有多線程,我們就不能一邊聽歌一邊玩飛車,聽歌與玩 游戲 不能並行;在使用多線程後,我們就可以在玩 游戲 的同時聽背景音樂。在這個例子中啟動飛車程序就是一個進程,玩 游戲 和聽音樂是兩個線程。
Python 提供了 threading 模塊來實現多線程:
因為新建線程系統需要分配資源、終止線程系統需要回收資源,所以如果可以重用線程,則可以減去新建/終止的開銷以提升性能。同時,使用線程池的語法比自己新建線程執行線程更加簡潔。
Python 為我們提供了 ThreadPoolExecutor 來實現線程池,此線程池默認子線程守護。它的適應場景為突發性大量請求或需要大量線程完成任務,但實際任務處理時間較短。
其中 max_workers 為線程池中的線程個數,常用的遍歷方法有 map 和 submit+as_completed 。根據業務場景的不同,若我們需要輸出結果按遍歷順序返回,我們就用 map 方法,若想誰先完成就返回誰,我們就用 submit+as_complete 方法。
我們把一個時間段內只允許一個線程使用的資源稱為臨界資源,對臨界資源的訪問,必須互斥的進行。互斥,也稱間接制約關系。線程互斥指當一個線程訪問某臨界資源時,另一個想要訪問該臨界資源的線程必須等待。當前訪問臨界資源的線程訪問結束,釋放該資源之後,另一個線程才能去訪問臨界資源。鎖的功能就是實現線程互斥。
我把線程互斥比作廁所包間上大號的過程,因為包間里只有一個坑,所以只允許一個人進行大號。當第一個人要上廁所時,會將門上上鎖,這時如果第二個人也想大號,那就必須等第一個人上完,將鎖解開後才能進行,在這期間第二個人就只能在門外等著。這個過程與代碼中使用鎖的原理如出一轍,這里的坑就是臨界資源。 Python 的 threading 模塊引入了鎖。 threading 模塊提供了 Lock 類,它有如下方法加鎖和釋放鎖:
我們會發現這個程序只會列印「第一道鎖」,而且程序既沒有終止,也沒有繼續運行。這是因為 Lock 鎖在同一線程內第一次加鎖之後還沒有釋放時,就進行了第二次 acquire 請求,導致無法執行 release ,所以鎖永遠無法釋放,這就是死鎖。如果我們使用 RLock 就能正常運行,不會發生死鎖的狀態。
在主線程中定義 Lock 鎖,然後上鎖,再創建一個子 線程t 運行 main 函數釋放鎖,結果正常輸出,說明主線程上的鎖,可由子線程解鎖。
如果把上面的鎖改為 RLock 則報錯。在實際中設計程序時,我們會將每個功能分別封裝成一個函數,每個函數中都可能會有臨界區域,所以就需要用到 RLock 。
一句話總結就是 Lock 不能套娃, RLock 可以套娃; Lock 可以由其他線程中的鎖進行操作, RLock 只能由本線程進行操作。