導航:首頁 > 源碼編譯 > 線程池源碼分析

線程池源碼分析

發布時間:2023-10-10 09:37:30

Ⅰ 合理使用線程池以及線程變數

背景

隨著計算技術的不斷發展,3納米製程晶元已進入試產階段,摩爾定律在現有工藝下逐漸面臨巨大的物理瓶頸,通過多核處理器技術來提升伺服器的性能成為提升算力的主要方向。

在伺服器領域,基於java構建的後端伺服器占據著領先地位,因此,掌握java並發編程技術,充分利用CPU的並發處理能力是一個開發人員必修的基本功,本文結合線程池源碼和實踐,簡要介紹了線程池和線程變數的使用。

線程池概述

線程池是一種「池化」的線程使用模式,通過創建一定數量的線程,讓這些線程處於就緒狀態來提高系統響應速度,在線程使用完成後歸還到線程池來達到重復利用的目標,從而降低系統資源的消耗。

總體來說,線程池有如下的優勢:

線程池的使用

在java中,線程池的實現類是ThreadPoolExecutor,構造函數如下:

可以通過 new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory,handler)來創建一個線程池。

在構造函數中,corePoolSize為線程池核心線程數。默認情況下,核心線程會一直存活,但是當將allowCoreThreadTimeout設置為true時,核心線程超時也會回收。

在構造函數中,maximumPoolSize為線程池所能容納的最首高模大線程數。

在構造函數中,keepAliveTime表示線程閑置超時時長。如果線程閑置時間超過該時長,非核心線程就會被回收。如果將allowCoreThreadTimeout設置為true時,核心線程也會超時回收。

在構造函數中,timeUnit表示線程閑置超時時長的時間單位。常用的有:TimeUnit.MILLISECONDS(毫秒)、TimeUnit.SECONDS(秒)、TimeUnit.MINUTES(分)。

在構造函數中,blockingQueue表示任務隊列,線程池任務隊列的常用實現類有:

在構造函數中,threadFactory表示線程工廠。用於指定為線程池創建新線程的方式,threadFactory可以設置線程名稱、線程組、優先順序等參數。如通過Google工具包可以設置線程池裡的線程名:

在構造函數中,rejectedExecutionHandler表示拒絕策略。當達到最大線程數且隊列任務已滿時需要執行的拒絕策略,常見的拒絕策略如下:


ThreadPoolExecutor線程池有如下幾種狀態:



線程池提交一個任務時任務調度的主要步驟如下:

核心代碼如下:


Tomcat 的整體架構包含連接器和容器兩大部分,其中連接器負責與外部通信,容器負責內部邏輯處理。在連接器中:

Tomcat為了實現請者緩求的快念鉛速響應,使用線程池來提高請求的處理能力。下面我們以HTTP非阻塞I/O為例對Tomcat線程池進行簡要的分析。

在Tomcat中,通過AbstractEndpoint類提供底層的網路I/O的處理,若用戶沒有配置自定義公共線程池,則AbstractEndpoint通過createExecutor方法來創建Tomcat默認線程池。

核心部分代碼如下:

其中,TaskQueue、ThreadPoolExecutor分別為Tomcat自定義任務隊列、線程池實現。

Tomcat自定義線程池繼承於java.util.concurrent.ThreadPoolExecutor,並新增了一些成員變數來更高效地統計已經提交但尚未完成的任務數量(submittedCount),包括已經在隊列中的任務和已經交給工作線程但還未開始執行的任務。

Tomcat在自定義線程池ThreadPoolExecutor中重寫了execute()方法,並實現對提交執行的任務進行submittedCount加一。Tomcat在自定義ThreadPoolExecutor中,當線程池拋出RejectedExecutionException異常後,會調用force()方法再次向TaskQueue中進行添加任務的嘗試。如果添加失敗,則submittedCount減一後,再拋出RejectedExecutionException。

在Tomcat中重新定義了一個阻塞隊列TaskQueue,它繼承於LinkedBlockingQueue。在Tomcat中,核心線程數默認值為10,最大線程數默認為200, 為了避免線程到達核心線程數後後續任務放入隊列等待,Tomcat通過自定義任務隊列TaskQueue重寫offer方法實現了核心線程池數達到配置數後線程的創建。

具體地,從線程池任務調度機制實現可知,當offer方法返回false時,線程池將嘗試創建新新線程,從而實現任務的快速響應。TaskQueue核心實現代碼如下:

Tomcat中通過自定義任務線程TaskThread實現對每個線程創建時間的記錄;使用靜態內部類WrappingRunnable對Runnable進行包裝,用於對StopPooledThreadException異常類型的處理。

Executors常用方法有以下幾個:

Executors類看起來功能比較強大、用起來還比較方便,但存在如下弊端

使用線程時,可以直接調用 ThreadPoolExecutor 的構造函數來創建線程池,並根據業務實際場景來設置corePoolSize、blockingQueue、RejectedExecuteHandler等參數。

使用局部線程池時,若任務執行完後沒有執行shutdown()方法或有其他不當引用,極易造成系統資源耗盡。

在工程實踐中,通常使用下述公式來計算核心線程數:

nThreads=(w+c)/c*n*u=(w/c+1)*n*u

其中,w為等待時間,c為計算時間,n為CPU核心數(通常可通過 Runtime.getRuntime().availableProcessors()方法獲取),u為CPU目標利用率(取值區間為[0, 1]);在最大化CPU利用率的情況下,當處理的任務為計算密集型任務時,即等待時間w為0,此時核心線程數等於CPU核心數。

上述計算公式是理想情況下的建議核心線程數,而不同系統/應用在運行不同的任務時可能會有一定的差異,因此最佳線程數參數還需要根據任務的實際運行情況和壓測表現進行微調。

為了更好地發現、分析和解決問題,建議在使用多線程時增加對異常的處理,異常處理通常有下述方案:

為了實現優雅停機的目標,我們應當先調用shutdown方法,調用這個方法也就意味著,這個線程池不會再接收任何新的任務,但是已經提交的任務還會繼續執行。之後我們還應當調用awaitTermination方法,這個方法可以設定線程池在關閉之前的最大超時時間,如果在超時時間結束之前線程池能夠正常關閉則會返回true,否則,超時會返回false。通常我們需要根據業務場景預估一個合理的超時時間,然後調用該方法。

如果awaitTermination方法返回false,但又希望盡可能在線程池關閉之後再做其他資源回收工作,可以考慮再調用一下shutdownNow方法,此時隊列中所有尚未被處理的任務都會被丟棄,同時會設置線程池中每個線程的中斷標志位。shutdownNow並不保證一定可以讓正在運行的線程停止工作,除非提交給線程的任務能夠正確響應中斷。

ThreadLocal線程變數概述

ThreadLocal類提供了線程本地變數(thread-local variables),這些變數不同於普通的變數,訪問線程本地變數的每個線程(通過其get或set方法)都有其自己的獨立初始化的變數副本,因此ThreadLocal沒有多線程競爭的問題,不需要單獨進行加鎖。

ThreadLocal的原理與實踐

對於ThreadLocal而言,常用的方法有get/set/initialValue 3個方法。

眾所周知,在java中SimpleDateFormat有線程安全問題,為了安全地使用SimpleDateFormat,除了1)創建SimpleDateFormat局部變數;和2)加同步鎖 兩種方案外,我們還可以使用3)ThreadLocal的方案:

Thread 內部維護了一個 ThreadLocal.ThreadLocalMap 實例(threadLocals),ThreadLocal 的操作都是圍繞著 threadLocals 來操作的。

從JDK源碼可見,ThreadLocalMap中的Entry是弱引用類型的,這就意味著如果這個ThreadLocal只被這個Entry引用,而沒有被其他對象強引用時,就會在下一次GC的時候回收掉。

EagleEye(鷹眼)作為全鏈路監控系統在集團內部被廣泛使用,traceId、rpcId、壓測標等信息存儲在EagleEye的ThreadLocal變數中,並在HSF/Dubbo服務調用間進行傳遞。EagleEye通過Filter將數據初始化到ThreadLocal中,部分相關代碼如下:

在EagleEyeFilter中,通過EagleEyeRequestTracer.startTrace方法進行初始化,在前置入參轉換後,通過startTrace重載方法將鷹眼上下文參數存入ThreadLocal中,相關代碼如下:

EagleEyeFilter在finally代碼塊中,通過EagleEyeRequestTracer.endTrace方法結束調用鏈,通過clear方法將ThreadLocal中的數據進行清理,相關代碼實現如下:

在某權益領取原有鏈路中,通過app打開一級頁面後才能發起權益領取請求,請求經過淘系無線網關(Mtop)後到達服務端,服務端通過mtop sdk獲取當前會話信息。

在XX項目中,對權益領取鏈路進行了升級改造,在一級頁面請求時,通過服務端同時發起權益領取請求。具體地,服務端在處理一級頁面請求時,同時通過調用hsf/bbo介面來進行權益領取,因此在發起rpc調用時需要攜帶用戶當前會話信息,在服務提供端將會話信息進行提取並注入到mtop上下文,從而才能通過mtop sdk獲取到會話id等信息。某開發同學在實現時,因ThreadLocal使用不當造成下述問題:

【問題1:權益領取失敗分析】

在權益領取服務中,該應用構建了一套高效和線程安全的依賴注入框架,基於該框架的業務邏輯模塊通常抽象為xxxMole形式,Mole間為網狀依賴關系,框架會按依賴關系自動調用init方法(其中,被依賴的mole 的init方法先執行)。

在應用中,權益領取介面的主入口為CommonXXApplyMole類,CommonXXApplyMole依賴XXSessionMole。當請求來臨時,會按依賴關系依次調用init方法,因此XXSessionMole的init方法會優先執行;而開發同學在CommonXXApplyMole類中的init方法中通過調用recoverMtopContext()方法來期望恢復mtop上下文,因recoverMtopContext()方法的調用時機過晚,從而導致XXSessionMole模塊獲取不到正確的會話id等信息而導致權益領取失敗。

【問題2:臟數據分析】

權益領取服務在處理請求時,若當前線程曾經處理過權益領取請求,因ThreadLocal變數值未被清理,此時XXSessionMole通過mtop SDK獲取會話信息時得到的是前一次請求的會話信息,從而造成臟數據。

【解決方案】

在依賴注入框架入口處AbstractGate#visit(或在XXSessionMole中)通過recoverMtopContext方法注入mtop上下文信息,並在入口方法的finally代碼塊清理當前請求的threadlocal變數值。

若使用強引用類型,則threadlocal的引用鏈為:Thread -> ThreadLocal.ThreadLocalMap -> Entry[] -> Entry -> key(threadLocal對象)和value;在這種場景下,只要這個線程還在運行(如線程池場景),若不調用remove方法,則該對象及關聯的所有強引用對象都不會被垃圾回收器回收。

若使用static關鍵字進行修飾,則一個線程僅對應一個線程變數;否則,threadlocal語義變為perThread-perInstance,容易引發內存泄漏,如下述示例:

在上述main方法第22行debug,可見線程的threadLocals變數中有3個threadlocal實例。在工程實踐中,使用threadlocal時通常期望一個線程只有一個threadlocal實例,因此,若不使用static修飾,期望的語義發生了變化,同時易引起內存泄漏。

如果不執行清理操作,則可能會出現:

建議使用try...finally 進行清理。

我們在使用ThreadLocal時,通常期望的語義是perThread,若不使用static進行修飾,則語義變為perThread-perInstance;在線程池場景下,若不用static進行修飾,創建的線程相關實例可能會達到 M * N個(其中M為線程數,N為對應類的實例數),易造成內存泄漏(https://errorprone.info/bugpattern/ThreadLocalUsage)。

在應用中,謹慎使用ThreadLocal.withInitial(Supplier<? extends S> supplier)這個工廠方法創建ThreadLocal對象,一旦不同線程的ThreadLocal使用了同一個Supplier對象,那麼隔離也就無從談起了,如:

總結

在java工程實踐中,線程池和線程變數被廣泛使用,因線程池和線程變數的不當使用經常造成安全生產事故,因此,正確使用線程池和線程變數是每一位開發人員必須修煉的基本功。本文從線程池和線程變數的使用出發,簡要介紹了線程池和線程變數的原理和使用實踐,各開發人員可結合最佳實踐和實際應用場景,正確地使用線程和線程變數,構建出穩定、高效的java應用服務。

Ⅱ 源碼修煉筆記之Dubbo線程池策略

FixedThreadPool

FixThreadPool內部是通過ThreadPoolExecutor來創建線程,核心線程數和最大線程數都是上下文中指定的線程數量threads,因為不存在空閑線程所以keepAliveTime為0,
當queues=0,創建SynchronousQueue阻塞隊列;
當queues<0,創建無界的阻塞隊列LinkedBlockingQueue;
當queues>0,創建有界的阻塞隊列LinkedBlockingQueue。
採用bbo自己實現的線程工廠NamedInternalThreadFactory,將線程置為守護線程(Demon)
拒絕策略為AbortPolicyWithReport,策略為將調用時的堆棧信息保存到本地文件中,並拋出異常RejectedExecutionException

CachedThreadPool

CachedThreadPool與FixedThreadPool的區別是核心線程數和最大線程數不相等,通過alive來控制空閑線程的釋放

LimitedThreadPool

LimitedThreadPool與CachedThreadPool的區別是空閑線程的超時時間為Long.MAX_VALUE,相當於線程數量不會動態變化了,創建的線程不會被釋放。

EagerThreadPool

與上述三種線程池不同,EagerThreadPool並非通過JUC中的ThreadPoolExecutor來創建線程池,而是通過EagerThreadPoolExecutor來創建線程池,EagerThreadPoolExecutor繼承自ThreadPoolExecutor,實現自定義的execute方法,採用的阻塞隊列是TaskQueue,TaskQueue繼承自LinkedBlockingQueue。

execute方法首先調用ThreadPoolExecutor的execute方法,如果執行失敗會重新放入TaskQueue進行重試。

實現自定義的ThreadPool

ThreadPool被定義為一個擴展點,如下所示,

其默認實現是FixedThreadPool,可以通過實現該擴展來實現自定義的線程池策略。

Ⅲ Android線程池ThreadPoolExecutor詳解

       傳統的多線程是通過繼承Thread類及實現Runnable介面來實現的,每次創建及銷毀線程都會消耗資源、響應速度慢,且線程缺乏統一管理,容易出現阻塞的情況,針對以上缺點,線程池就出現了。

       線程池是一個創建使用線程並能保存使用過的線程以達到復用的對象,簡單的說就是一塊緩存了一定數量線程的區域。

       1.復用線程:線程執行完不會立刻退出,繼續執行其他線程;
       2.管理線程:統一分配、管理、控制最大並發數;

       1.降低因頻繁創建&銷毀線程帶來的性能開銷,復用緩存在線程池中的線程;
       2.提高線程執行效率&響應速度,復用線程:響應速度;管理線程:優化線程執行順序,避免大量線程搶占資源導致阻塞現象;
       3.提高對線程的管理度;

       線程池的使用也比較簡單,流程如下:

       接下來通過源碼來介紹一下ThreadPoolExecutor內部實現及工作原理。

       線程池的最終實現類是ThreadPoolExecutor,通過實現可以一步一步的看到,父介面為Executor:

       其他的繼承及實現關系就不一一列舉了,直接通過以下圖來看一下:

       從構造方法開始看:

       通過以上可以看到,在創建ThreadPoolExecutor時,對傳入的參數是有要求的:corePoolSize不能小於0;maximumPoolSize需要大於0,且需要大於等於corePoolSize;keepAliveTime大於0;workQueue、threadFactory都不能為null。
       在創建完後就需要執行Runnable了,看以下execute()方法:

       在execute()內部主要執行的邏輯如下:
       分析點1:如果當前線程數未超過核心線程數,則將runnable作為參數執行addWorker(),true表示核心線程,false表示非核心線程;
       分析點2:核心線程滿了,如果線程池處於運行狀態則往workQueue隊列中添加任務,接下來判斷是否需要拒絕或者執行addWorker();
       分析點3:以上都不滿足時 [corePoolSize=0且沒有運行的線程,或workQueue已經滿了] ,執行addWorker()添加runnable,失敗則執行拒絕策略;
        總結一下:線程池對線程創建的管理,流程圖如下:

       在執行addWorker時,主要做了以下兩件事:
       分析點1:將runnable作為參數創建Worker對象w,然後獲取w內部的變數thread;
       分析點2:調用start()來啟動thread;
       在addWorker()內部會將runnable作為參數傳給Worker,然後從Worker內部讀取變數thread,看一下Worker類的實現:

       Worker實現了Runnable介面,在Worker內部,進行了賦值及創建操作,先將execute()時傳入的runnable賦值給內部變數firstTask,然後通過ThreadFactory.newThread(this)創建Thread,上面講到在addWorker內部執行t.start()後,會執行到Worker內部的run()方法,接著會執行runWorker(this),一起看一下:

       前面可以看到,runWorker是執行在子線程內部,主要執行了三件事:
       分析1:獲取當前線程,當執行shutdown()時需要將線程interrupt(),接下來從Worker內部取到firstTask,即execute傳入的runnable,接下來會執行;
       分析2:while循環,task不空直接執行;否則執行getTask()去獲取,不為空直接執行;
       分析3:對有效的task執行run(),由於是在子線程中執行,因此直接run()即可,不需要start();
       前面看到,在while內部有執行getTask(),一起看一下:

       getTask()是從workQueue內部獲取接下來需要執行的runnable,內部主要做了兩件事:
       分析1:先獲取到當前正在執行工作的線程數量wc,通過判斷allowCoreThreadTimeOut[在創建ThreadPoolExecutor時可以進行設置]及wc > corePoolSize來確定timed值;
       分析2:通過timed值來決定執行poll()或者take(),如果WorkQueue中有未執行的線程時,兩者作用是相同的,立刻返回線程;如果WorkQueue中沒有線程時,poll()有超時返回,take()會一直阻塞;如果allowCoreThreadTimeOut為true,則核心線程在超時時間沒有使用的話,是需要退出的;wc > corePoolSize時,非核心線程在超時時間沒有使用的話,是需要退出的;
       allowCoreThreadTimeOut是可以通過以下方式進行設置的:

       如果沒有進行設置,那麼corePoolSize數量的核心線程會一直存在。
        總結一下:ThreadPoolExecutor內部的核心線程如何確保一直存在,不退出?
       上面分析已經回答了這個問題,每個線程在執行時會執行runWorker(),而在runWorker()內部有while()循環會判斷getTask(),在getTask()內部會對當前執行的線程數量及allowCoreThreadTimeOut進行實時判斷,如果工作數量大於corePoolSize且workQueue中沒有未執行的線程時,會執行poll()超時退出;如果工作數量不大於corePoolSize且workQueue中沒有未執行的線程時,會執行take()進行阻塞,確保有corePoolSize數量的線程阻塞在runWorker()內部的while()循環不退出。
       如果需要關閉線程池,需要如何操作呢,看一下shutdown()方法:

       以上可以看到,關閉線程池的原理:a. 遍歷線程池中的所有工作線程;b. 逐個調用線程的interrupt()中斷線程(註:無法響應中斷的任務可能永遠無法終止)
       也可調用shutdownNow()來關閉線程池,二者區別:
       shutdown():設置線程池的狀態為SHUTDOWN,然後中斷所有沒有正在執行任務的線程;
       shutdownNow():設置線程池的狀態為STOP,然後嘗試停止所有的正在執行或暫停任務的線程,並返回等待執行任務的列表;
       使用建議:一般調用shutdown()關閉線程池;若任務不一定要執行完,則調用shutdownNow();
        總結一下:ThreadPoolExecutor在執行execute()及shutdown()時的調用關系,流程圖如下:

       線程池可以通過Executors來進行不同類型的創建,具體分為四種不同的類型,如下:

       可緩存線程池:不固定線程數量,且支持最大為Integer.MAX_VALUE的線程數量:

       1、線程數無限制
       2、有空閑線程則復用空閑線程,若無空閑線程則新建線程
       3、一定程度上減少頻繁創建/銷毀線程,減少系統開銷

       固定線程數量的線程池:定長線程池

       1、可控制線程最大並發數(同時執行的線程數)
       2、超出的線程會在隊列中等待。

       單線程化的線程池:可以理解為線程數量為1的FixedThreadPool

       1、有且僅有一個工作線程執行任務
       2、所有任務按照指定順序執行,即遵循隊列的入隊出隊規則

       定時以指定周期循環執行任務

       一般來說,等待隊列 BlockingQueue 有: ArrayBlockingQueue 、 LinkedBlockingQueue 與 SynchronousQueue 。
       假設向線程池提交任務時,核心線程都被佔用的情況下:
        ArrayBlockingQueue :基於數組的阻塞隊列,初始化需要指定固定大小。
       當使用此隊列時,向線程池提交任務,會首先加入到等待隊列中,當等待隊列滿了之後,再次提交任務,嘗試加入隊列就會失敗,這時就會檢查如果當前線程池中的線程數未達到最大線程,則會新建線程執行新提交的任務。所以最終可能出現後提交的任務先執行,而先提交的任務一直在等待。
        LinkedBlockingQueue :基於鏈表實現的阻塞隊列,初始化可以指定大小,也可以不指定。
       當指定大小後,行為就和 ArrayBlockingQueue一致。而如果未指定大小,則會使用默認的 Integer.MAX_VALUE 作為隊列大小。這時候就會出現線程池的最大線程數參數無用,因為無論如何,向線程池提交任務加入等待隊列都會成功。最終意味著所有任務都是在核心線程執行。如果核心線程一直被占,那就一直等待。
        SynchronousQueue :無容量的隊列。
       使用此隊列意味著希望獲得最大並發量。因為無論如何,向線程池提交任務,往隊列提交任務都會失敗。而失敗後如果沒有空閑的非核心線程,就會檢查如果當前線程池中的線程數未達到最大線程,則會新建線程執行新提交的任務。完全沒有任何等待,唯一制約它的就是最大線程數的個數。因此一般配合Integer.MAX_VALUE就實現了真正的無等待。
       但是需要注意的是, 進程的內存是存在限制的,而每一個線程都需要分配一定的內存。所以線程並不能無限個。

Ⅳ 並發編程解惑之線程

主要內容:

進程是資源分配的最小單位,每個進程都有獨立的代碼和數據空間,一個進程包含 1 到 n 個線程。線程是 CPU 調度的最小單位,每個線程有獨立的運行棧和程序計數器,線程切換開銷小。

Java 程序總是從主類的 main 方法開始執行,main 方法就是 Java 程序默認的主線程,而在 main 方法中再創建的線程就是其他線程。在 Java 中,每次程序啟動至少啟動 2 個線程。一個是 main 線程,一個是垃圾收集線程。每次使用 Java 命令啟動一個 Java 程序,就相當於啟動一個 JVM 實例,而每個 JVM 實例就是在操作系統中啟動的一個進程。

多線程可以通過繼承或實現介面的方式創建。

Thread 類是 JDK 中定義的用於控制線程對象的類,該類中封裝了線程執行體 run() 方法。需要強調的一點是,線程執行先後與創建順序無關。

通過 Runnable 方式創建線程相比通過繼承 Thread 類創建線程的優勢是避免了單繼承的局限性。若一個 boy 類繼承了 person 類,boy 類就無法通過繼承 Thread 類的方式來實現多線程。

使用 Runnable 介面創建線程的過程:先是創建對象實例 MyRunnable,然後將對象 My Runnable 作為 Thread 構造方法的入參,來構造出線程。對於 new Thread(Runnable target) 創建的使用同一入參目標對象的線程,可以共享該入參目標對象 MyRunnable 的成員變數和方法,但 run() 方法中的局部變數相互獨立,互不幹擾。

上面代碼是 new 了三個不同的 My Runnable 對象,如果只想使用同一個對象,可以只 new 一個 MyRunnable 對象給三個 new Thread 使用。

實現 Runnable 介面比繼承 Thread 類所具有的優勢:

線程有新建、可運行、阻塞、等待、定時等待、死亡 6 種狀態。一個具有生命的線程,總是處於這 6 種狀態之一。 每個線程可以獨立於其他線程運行,也可和其他線程協同運行。線程被創建後,調用 start() 方法啟動線程,該線程便從新建態進入就緒狀態。

NEW 狀態(新建狀態) 實例化一個線程之後,並且這個線程沒有開始執行,這個時候的狀態就是 NEW 狀態:

RUNNABLE 狀態(就緒狀態):

阻塞狀態有 3 種:

如果一個線程調用了一個對象的 wait 方法, 那麼這個線程就會處於等待狀態(waiting 狀態)直到另外一個線程調用這個對象的 notify 或者 notifyAll 方法後才會解除這個狀態。

run() 里的代碼執行完畢後,線程進入終結狀態(TERMINATED 狀態)。

線程狀態有 6 種:新建、可運行、阻塞、等待、定時等待、死亡。

我們看下 join 方法的使用:

運行結果:

我們來看下 yield 方法的使用:

運行結果:

線程與線程之間是無法直接通信的,A 線程無法直接通知 B 線程,Java 中線程之間交換信息是通過共享的內存來實現的,控制共享資源的讀寫的訪問,使得多個線程輪流執行對共享數據的操作,線程之間通信是通過對共享資源上鎖或釋放鎖來實現的。線程排隊輪流執行共享資源,這稱為線程的同步。

Java 提供了很多同步操作(也就是線程間的通信方式),同步可使用 synchronized 關鍵字、Object 類的 wait/notifyAll 方法、ReentrantLock 鎖、無鎖同步 CAS 等方式來實現。

ReentrantLock 是 JDK 內置的一個鎖對象,用於線程同步(線程通信),需要用戶手動釋放鎖。

運行結果:

這表明同一時間段只能有 1 個線程執行 work 方法,因為 work 方法里的代碼需要獲取到鎖才能執行,這就實現了多個線程間的通信,線程 0 獲取鎖,先執行,線程 1 等待,線程 0 釋放鎖,線程 1 繼續執行。

synchronized 是一種語法級別的同步方式,稱為內置鎖。該鎖會在代碼執行完畢後由 JVM 釋放。

輸出結果跟 ReentrantLock 一樣。

Java 中的 Object 類默認是所有類的父類,該類擁有 wait、 notify、notifyAll 方法,其他對象會自動繼承 Object 類,可調用 Object 類的這些方法實現線程間的通信。

除了可以通過鎖的方式來實現通信,還可通過無鎖的方式來實現,無鎖同 CAS(Compare-and-Swap,比較和交換)的實現,需要有 3 個操作數:內存地址 V,舊的預期值 A,即將要更新的目標值 B,當且僅當內存地址 V 的值與預期值 A 相等時,將內存地址 V 的值修改為目標值 B,否則就什麼都不做。

我們通過計算器的案例來演示無鎖同步 CAS 的實現方式,非線程安全的計數方式如下:

線程安全的計數方式如下:

運行結果:

線程安全累加的結果才是正確的,非線程安全會出現少計算值的情況。JDK 1.5 開始,並發包里提供了原子操作的類,AtomicBoolean 用原子方式更新的 boolean 值,AtomicInteger 用原子方式更新 int 值,AtomicLong 用原子方式更新 long 值。 AtomicInteger 和 AtomicLong 還提供了用原子方式將當前值自增 1 或自減 1 的方法,在多線程程序中,諸如 ++i 或 i++ 等運算不具有原子性,是不安全的線程操作之一。 通常我們使用 synchronized 將該操作變成一個原子操作,但 JVM 為此種操作提供了原子操作的同步類 Atomic,使用 AtomicInteger 做自增運算的性能是 ReentantLock 的好幾倍。

上面我們都是使用底層的方式實現線程間的通信的,但在實際的開發中,我們應該盡量遠離底層結構,使用封裝好的 API,例如 J.U.C 包(java.util.concurrent,又稱並發包)下的工具類 CountDownLath、CyclicBarrier、Semaphore,來實現線程通信,協調線程執行。

CountDownLatch 能夠實現線程之間的等待,CountDownLatch 用於某一個線程等待若干個其他線程執行完任務之後,它才開始執行。

CountDownLatch 類只提供了一個構造器:

CountDownLatch 類中常用的 3 個方法:

運行結果:

CyclicBarrier 字面意思循環柵欄,通過它可以讓一組線程等待至某個狀態之後再全部同時執行。當所有等待線程都被釋放以後,CyclicBarrier 可以被重復使用,所以有循環之意。

相比 CountDownLatch,CyclicBarrier 可以被循環使用,而且如果遇到線程中斷等情況時,可以利用 reset() 方法,重置計數器,CyclicBarrier 會比 CountDownLatch 更加靈活。

CyclicBarrier 提供 2 個構造器:

上面的方法中,參數 parties 指讓多少個線程或者任務等待至 barrier 狀態;參數 barrierAction 為當這些線程都達到 barrier 狀態時會執行的內容。

CyclicBarrier 中最重要的方法 await 方法,它有 2 個重載版本。下面方法用來掛起當前線程,直至所有線程都到達 barrier 狀態再同時執行後續任務。

而下面的方法則是讓這些線程等待至一定的時間,如果還有線程沒有到達 barrier 狀態就直接讓到達 barrier 的線程執行任務。

運行結果:

CyclicBarrier 用於一組線程互相等待至某個狀態,然後這一組線程再同時執行,CountDownLatch 是不能重用的,而 CyclicBarrier 可以重用。

Semaphore 類是一個計數信號量,它可以設定一個閾值,多個線程競爭獲取許可信號,執行完任務後歸還,超過閾值後,線程申請許可信號時將會被阻塞。Semaphore 可以用來 構建對象池,資源池,比如資料庫連接池。

假如在伺服器上運行著若干個客戶端請求的線程。這些線程需要連接到同一資料庫,但任一時刻只能獲得一定數目的資料庫連接。要怎樣才能夠有效地將這些固定數目的資料庫連接分配給大量的線程呢?

給方法加同步鎖,保證同一時刻只能有一個線程去調用此方法,其他所有線程排隊等待,但若有 10 個資料庫連接,也只有一個能被使用,效率太低。另外一種方法,使用信號量,讓信號量許可與資料庫可用連接數為相同數量,10 個資料庫連接都能被使用,大大提高性能。

上面三個工具類是 J.U.C 包的核心類,J.U.C 包的全景圖就比較復雜了:

J.U.C 包(java.util.concurrent)中的高層類(Lock、同步器、阻塞隊列、Executor、並發容器)依賴基礎類(AQS、非阻塞數據結構、原子變數類),而基礎類是通過 CAS 和 volatile 來實現的。我們盡量使用頂層的類,避免使用基礎類 CAS 和 volatile 來協調線程的執行。J.U.C 包其他的內容,在其他的篇章會有相應的講解。

Future 是一種非同步執行的設計模式,類似 ajax 非同步請求,不需要同步等待返回結果,可繼續執行代碼。使 Runnable(無返回值不支持上報異常)或 Callable(有返回值支持上報異常)均可開啟線程執行任務。但是如果需要非同步獲取線程的返回結果,就需要通過 Future 來實現了。

Future 是位於 java.util.concurrent 包下的一個介面,Future 介面封裝了取消任務,獲取任務結果的方法。

在 Java 中,一般是通過繼承 Thread 類或者實現 Runnable 介面來創建多線程, Runnable 介面不能返回結果,JDK 1.5 之後,Java 提供了 Callable 介面來封裝子任務,Callable 介面可以獲取返回結果。我們使用線程池提交 Callable 介面任務,將返回 Future 介面添加進 ArrayList 數組,最後遍歷 FutureList,實現非同步獲取返回值。

運行結果:

上面就是非同步線程執行的調用過程,實際開發中用得更多的是使用現成的非同步框架來實現非同步編程,如 RxJava,有興趣的可以繼續去了解,通常非同步框架都是結合遠程 HTTP 調用 Retrofit 框架來使用的,兩者結合起來用,可以避免調用遠程介面時,花費過多的時間在等待介面返回上。

線程封閉是通過本地線程 ThreadLocal 來實現的,ThreadLocal 是線程局部變數(local vari able),它為每個線程都提供一個變數值的副本,每個線程對該變數副本的修改相互不影響。

在 JVM 虛擬機中,堆內存用於存儲共享的數據(實例對象),也就是主內存。Thread Local .set()、ThreadLocal.get() 方法直接在本地內存(工作內存)中寫和讀共享變數的副本,而不需要同步數據,不用像 synchronized 那樣保證數據可見性,修改主內存數據後還要同步更新到工作內存。

Myabatis、hibernate 是通過 threadlocal 來存儲 session 的,每一個線程都維護著一個 session,對線程獨享的資源操作很方便,也避免了線程阻塞。

ThreadLocal 類位於 Thread 線程類內部,我們分析下它的源碼:

ThreadLocal 和 Synchonized 都用於解決多線程並發訪問的問題,訪問多線程共享的資源時,Synchronized 同步機制採用了以時間換空間的方式,提供一份變數讓多個線程排隊訪問,而 ThreadLocal 採用了以空間換時間的方式,提供每個線程一個變數,實現數據隔離。

ThreadLocal 可用於資料庫連接 Connection 對象的隔離,使得每個請求線程都可以復用連接而又相互不影響。

在 Java 裡面,存在強引用、弱引用、軟引用、虛引用。我們主要來了解下強引用和弱引用:

上面 a、b 對實例 A、B 都是強引用

而上面這種情況就不一樣了,即使 b 被置為 null,但是 c 仍然持有對 C 對象實例的引用,而間接的保持著對 b 的強引用,所以 GC 不會回收分配給 b 的空間,導致 b 無法回收也沒有被使用,造成了內存泄漏。這時可以通過 c = null; 來使得 c 被回收,但也可以通過弱引用來達到同樣目的:

從源碼中可以看出 Entry 里的 key 對 ThreadLocal 實例是弱引用:

Entry 里的 key 對 ThreadLocal 實例是弱引用,將 key 值置為 null,堆中的 ThreadLocal 實例是可以被垃圾收集器(GC)回收的。但是 value 卻存在一條從 Current Thread 過來的強引用鏈,只有當當前線程 Current Thread 銷毀時,value 才能被回收。在 threadLocal 被設為 null 以及線程結束之前,Entry 的鍵值對都不會被回收,出現內存泄漏。為了避免泄漏,在 ThreadLocalMap 中的 set/get Entry 方法里,會對 key 為 null 的情況進行判斷,如果為 null 的話,就會對 value 置為 null。也可以通過 ThreadLocal 的 remove 方法(類似加鎖和解鎖,最後 remove 一下,解鎖對象的引用)直接清除,釋放內存空間。

總結來說,利用 ThreadLocal 來訪問共享數據時,JVM 通過設置 ThreadLocalMap 的 Key 為弱引用,來避免內存泄露,同時通過調用 remove、get、set 方法的時候,回收弱引用(Key 為 null 的 Entry)。當使用 static ThreadLocal 的時候(如上面的 Spring 多數據源),static 變數在類未載入的時候,它就已經載入,當線程結束的時候,static 變數不一定會被回收,比起普通成員變數使用的時候才載入,static 的生命周期變長了,若沒有及時回收,容易產生內存泄漏。

使用線程池,可以重用存在的線程,減少對象創建、消亡的開銷,可控制最大並發線程數,避免資源競爭過度,還能實現線程定時執行、單線程執行、固定線程數執行等功能。

Java 把線程的調用封裝成了一個 Executor 介面,Executor 介面中定義了一個 execute 方法,用來提交線程的執行。Executor 介面的子介面是 ExecutorService,負責管理線程的執行。通過 Executors 類的靜態方法可以初始化

ExecutorService 線程池。Executors 類的靜態方法可創建不同類型的線程池:

但是,不建議使用 Executors 去創建線程池,而是通過 ThreadPoolExecutor 的方式,明確給出線程池的參數去創建,規避資源耗盡的風險。

如果使用 Executors 去創建線程池:

最佳的實踐是通過 ThreadPoolExecutor 手動地去創建線程池,選取合適的隊列存儲任務,並指定線程池線程大小。通過線程池實現類 ThreadPoolExecutor 可構造出線程池的,構造函數有下面幾個重要的參數:

參數 1:corePoolSize

線程池核心線程數。

參數 2:workQueue

阻塞隊列,用於保存執行任務的線程,有 4 種阻塞隊列可選:

參數 3:maximunPoolSize

線程池最大線程數。如果阻塞隊列滿了(有界的阻塞隊列),來了一個新的任務,若線程池當前線程數小於最大線程數,則創建新的線程執行任務,否則交給飽和策略處理。如果是無界隊列就不存在這種情況,任務都在無界隊列里存儲著。

參數 4:RejectedExecutionHandler

拒絕策略,當隊列滿了,而且線程達到了最大線程數後,對新任務採取的處理策略。

有 4 種策略可選:

最後,還可以自定義處理策略。

參數 5:ThreadFactory

創建線程的工廠。

參數 6:keeyAliveTime

線程沒有任務執行時最多保持多久時間終止。當線程池中的線程數大於 corePoolSize 時,線程池中所有線程中的某一個線程的空閑時間若達到 keepAliveTime,則會終止,直到線程池中的線程數不超過 corePoolSize。但如果調用了 allowCoreThread TimeOut(boolean value) 方法,線程池中的線程數就算不超過 corePoolSize,keepAlive Time 參數也會起作用,直到線程池中的線程數量變為 0。

參數 7:TimeUnit

配合第 6 個參數使用,表示存活時間的時間單位最佳的實踐是通過 ThreadPoolExecutor 手動地去創建線程池,選取合適的隊列存儲任務,並指定線程池線程大小。

運行結果:

線程池創建線程時,會將線程封裝成工作線程 Worker,Worker 在執行完任務後,還會不斷的去獲取隊列里的任務來執行。Worker 的加鎖解鎖機制是繼承 AQS 實現的。

我們來看下 Worker 線程的運行過程:

總結來說,如果當前運行的線程數小於 corePoolSize 線程數,則獲取全局鎖,然後創建新的線程來執行任務如果運行的線程數大於等於 corePoolSize 線程數,則將任務加入阻塞隊列 BlockingQueue 如果阻塞隊列已滿,無法將任務加入 BlockingQueue,則獲取全局所,再創建新的線程來執行任務

如果新創建線程後使得線程數超過了 maximumPoolSize 線程數,則調用 Rejected ExecutionHandler.rejectedExecution() 方法根據對應的拒絕策略處理任務。

CPU 密集型任務,線程執行任務佔用 CPU 時間會比較長,應該配置相對少的線程數,避免過度爭搶資源,可配置 N 個 CPU+1 個線程的線程池;但 IO 密集型任務則由於需要等待 IO 操作,線程經常處於等待狀態,應該配置相對多的線程如 2*N 個 CPU 個線程,A 線程阻塞後,B 線程能馬上執行,線程多競爭激烈,能飽和的執行任務。線程提交 SQL 後等待資料庫返回結果時間較長的情況,CPU 空閑會較多,線程數應設置大些,讓更多線程爭取 CPU 的調度。

Ⅳ 【Java基礎】線程池的原理是什麼

什麼是線程池?

總歸為:池化技術 ---》資料庫連接池 緩存架構 緩存池 線程池 內存池,連接池,這種思想演變成緩存架構技術---> JDK設計思想有千絲萬縷的聯系

首先我們從最核心的ThreadPoolExecutor類中的方法講起,然後再講述它的實現原理,接著給出了它的使用示例,最後討論了一下如何合理配置線程池的大小。

Java 中的 ThreadPoolExecutor 類

java.uitl.concurrent.ThreadPoolExecutor 類是線程池中最核心的一個類,因此如果要透徹地了解Java 中的線程池,必須先了解這個類。下面我們來看一下 ThreadPoolExecutor 類的具體實現源碼。

在 ThreadPoolExecutor 類中提供了四個構造方法:

閱讀全文

與線程池源碼分析相關的資料

熱點內容
程序員招聘自薦信 瀏覽:693
魔獸鍵位設置命令宏 瀏覽:645
程序員沒有目標了 瀏覽:828
搶答器c程序編程 瀏覽:703
什麼app可以自己玩 瀏覽:76
刨客app是什麼 瀏覽:963
cad輸入命令欄不見了 瀏覽:834
做故事集可以用什麼app 瀏覽:692
qq郵箱發送壓縮包 瀏覽:672
程序員桌面機器人 瀏覽:589
xjr快速開發平台源碼 瀏覽:159
java介面runnable 瀏覽:31
python怎麼運行web伺服器 瀏覽:349
notepad編程代碼 瀏覽:740
什麼安卓的毛病最少 瀏覽:611
hp的pjl設備訪問命令 瀏覽:635
googlewebp圖片壓縮技術 瀏覽:215
tbc薩滿加血宏命令 瀏覽:757
pdf閃 瀏覽:289
手機伺服器地址填什麼 瀏覽:258