三種專門用於線程同步的機制:POSIX信號量,互斥量和條件變數.
在Linux上信號量API有兩組,一組是System V IPC信號量,即PV操作,另外就是POSIX信號量,POSIX信號量的名字都是以sem_開頭.
phshared參數指定信號量的類型,若其值為0,就表示這個信號量是當前進程的局部信號量,否則該信號量可以在多個進程之間共享.value值指定信號量的初始值,一般與下面的sem_wait函數相對應.
其中比較重要的函數sem_wait函數會以原子操作的方式將信號量的值減一,如果信號量的值為零,則sem_wait將會阻塞,信號量的值可以在sem_init函數中的value初始化;sem_trywait函數是sem_wait的非阻塞版本;sem_post函數將以原子的操作對信號量加一,當信號量的值大於0時,其他正在調用sem_wait等待信號量的線程將被喚醒.
這些函數成功時返回0,失敗則返回-1並設置errno.
生產者消費者模型:
生產者對應一個信號量:sem_t procer;
消費者對應一個信號量:sem_t customer;
sem_init(&procer,2)----生產者擁有資源,可以工作;
sem_init(&customer,0)----消費者沒有資源,阻塞;
在訪問公共資源前對互斥量設置(加鎖),確保同一時間只有一個線程訪問數據,在訪問完成後再釋放(解鎖)互斥量.
互斥鎖的運行方式:串列訪問共享資源;
信號量的運行方式:並行訪問共享資源;
互斥量用pthread_mutex_t數據類型表示,在使用互斥量之前,必須使用pthread_mutex_init函數對它進行初始化,注意,使用完畢後需調用pthread_mutex_destroy.
pthread_mutex_init用於初始化互斥鎖,mutexattr用於指定互斥鎖的屬性,若為NULL,則表示默認屬性。除了用這個函數初始化互斥所外,還可以用如下方式初始化:pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER。
pthread_mutex_destroy用於銷毀互斥鎖,以釋放佔用的內核資源,銷毀一個已經加鎖的互斥鎖將導致不可預期的後果。
pthread_mutex_lock以原子操作給一個互斥鎖加鎖。如果目標互斥鎖已經被加鎖,則pthread_mutex_lock則被阻塞,直到該互斥鎖佔有者把它給解鎖.
pthread_mutex_trylock和pthread_mutex_lock類似,不過它始終立即返回,而不論被操作的互斥鎖是否加鎖,是pthread_mutex_lock的非阻塞版本.當目標互斥鎖未被加鎖時,pthread_mutex_trylock進行加鎖操作;否則將返回EBUSY錯誤碼。注意:這里討論的pthread_mutex_lock和pthread_mutex_trylock是針對普通鎖而言的,對於其他類型的鎖,這兩個加鎖函數會有不同的行為.
pthread_mutex_unlock以原子操作方式給一個互斥鎖進行解鎖操作。如果此時有其他線程正在等待這個互斥鎖,則這些線程中的一個將獲得它.
三個列印機輪流列印:
輸出結果:
如果說互斥鎖是用於同步線程對共享數據的訪問的話,那麼條件變數就是用於在線程之間同步共享數據的值.條件變數提供了一種線程之間通信的機制:當某個共享數據達到某個值時,喚醒等待這個共享數據的線程.
條件變數會在條件不滿足的情況下阻塞線程.且條件變數和互斥量一起使用,允許線程以無競爭的方式等待特定的條件發生.
其中pthread_cond_broadcast函數以廣播的形式喚醒所有等待目標條件變數的線程,pthread_cond_signal函數用於喚醒一個等待目標條件變數線程.但有時候我們可能需要喚醒一個固定的線程,可以通過間接的方法實現:定義一個能夠唯一標識目標線程的全局變數,在喚醒等待條件變數的線程前先設置該變數為目標線程,然後採用廣播的方式喚醒所有等待的線程,這些線程被喚醒之後都檢查該變數以判斷是否是自己.
採用條件變數+互斥鎖實現生產者消費者模型:
運行結果:
阻塞隊列+生產者消費者
運行結果:
❷ 求linux毫秒級定時器的實現
1 nanosleep函數可以提供最高解析度,一般是納秒級
2 select、poll函數的定時是毫秒級,pselect是納秒級
以上三個函數都可以實現你的要求
❸ linux下多個定時器的實現(C語言),麻煩高手指點哈嘛(急)
給你兩個函數參考
omsTimer函數是處理定時事件,void(*handle)(union sigval v)參數就是處理事件的函數指針。
int omsSetTimer(timer_t *tId,int value,int interval)就是設置定時器。
按你說的,如果要同時起多個定時器,需要定義一個數組timer_t tm[n];int it[n];tm就是定時器結構,it用來記錄對應的定時器是否已經使用,使用中的就是1,沒用的就是0;
主進程消息來了就從it找一個沒用的來omsSetTimer,如果收到終止消息,那omsSetTimer 定時時間為0
int omsTimer(timer_t *tId,int iValue,int iSeconds ,void(*handle)(union sigval v),void * param)
{
struct sigevent se;
struct itimerspec ts;
memset (&se, 0, sizeof (se));
se.sigev_notify = SIGEV_THREAD;
se.sigev_notify_function = handle;
se.sigev_value.sival_ptr = param;
if (timer_create (CLOCK_REALTIME, &se, tId) < 0)
{
return -1;
}
ts.it_value.tv_sec = iValue;
// ts.it_value.tv_sec =3;
//ts.it_value.tv_nsec = (long)(iValue % 1000) * (1000000L);
ts.it_value.tv_nsec = 0;
ts.it_interval.tv_sec = iSeconds;
//ts.it_interval.tv_nsec = (long)(iSeconds % 1000) * (1000000L);
ts.it_interval.tv_nsec = 0;
if (timer_settime(*tId, TIMER_ABSTIME, &ts, NULL) < 0)
{
return -1;
}
return 0;
}
int omsSetTimer(timer_t *tId,int value,int interval)
{
struct itimerspec ts;
ts.it_value.tv_sec =value;
//ts.it_value.tv_nsec = (long)(value % 1000) * (1000000L);
ts.it_value.tv_nsec = 0;
ts.it_interval.tv_sec = interval;
//ts.it_interval.tv_nsec = (long)(interval % 1000) * (1000000L);
ts.it_interval.tv_nsec = 0;
if (timer_settime(*tId, TIMER_ABSTIME, &ts, NULL) < 0)
{
return -1;
}
return 0;
}
❹ Linux 的多線程編程中,如何給線程發信號
不管是在進程還是線程,很多時候我們都會使用一些定時器之類的功能,這里就定時器在多線程的使用說一下。首先在linux編程中定時器函數有alarm()和setitimer(),alarm()可以提供一個基於秒的定時功能,而setitimer可以提供一個基於微妙的定時功能。
alarm()原型:
#include <unistd.h>
unsigned int alarm(unsigned int seconds);
這個函數在使用上很簡單,第一次調用這個函數的時候是設置定時器的初值,下一次調用是重新設置這個值,並會返回上一次定時的剩餘時間。
setitimer()原型:
#include <sys/time.h>
int setitimer(int which, const struct itimerval *value,struct itimerval *ovalue);
這個函數使用起來稍微有點說法,首先是第一個參數which的值,這個參數設置timer的計時策略,which有三種狀態分別是:
ITIMER_REAL:使用系統時間來計數,時間為0時發出SIGALRM信號,這種定時能夠得到一個精準的定時,當然這個定時是相對的,因為到了微秒級別我們的處理器本身就不夠精確。
ITIMER_VIRTUAL:使用進程時間也就是進程分配到的時間片的時間來計數,時間為0是發出SIGVTALRM信號,這種定時顯然不夠准確,因為系統給進程分配時間片不由我們控制。
ITIMER_PROF:上面兩種情況都能夠觸發
第二個參數參數value涉及到兩個結構體:
struct itimerval {
struct timeval it_interval; /* next value */
struct timeval it_value; /* current value */
};
struct timeval {
long tv_sec; /* seconds */
long tv_usec; /* microseconds */
};
在結構體itimerval中it_value是定時器當前的值,it_interval是當it_value的為0後重新填充的值。而timeval結構體中的兩個變數就簡單了一個是秒一個是微秒。
上面是這兩個定時函數的說明,這個函數使用本不是很難,可以說是很簡單,但是碰到具體的應用的時候可能就遇到問題了,在多進程編程中使用一般不會碰到什麼問題,這里說的這些問題主要體現在多線程編程中。比如下面這個程序:
#include <pthread.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/time.h>
void sig_handler(int signo)
{
alarm(2);
printf("alarm signal\n");
}
void *pthread_func()
{
alarm(2);
while(1)
{
pause();
}
}
int main(int argc, char **argv)
{
pthread_t tid;
int retval;
signal(SIGALRM, sig_handler);
if((retval = pthread_create(&tid, NULL, pthread_func, NULL)) < 0)
{
perror("pthread_create");
exit(-1);
}
while(1)
{
printf("main thread\n");
sleep(10);
}
return 0;
}
這個程序的理想結果是:
main thread
alarm signal
alarm signal
alarm signal
alarm signal
alarm signal
main thread
可事實上並不是這樣的,它的結果是:
main pthread
alarm signal
main pthread
alarm signal
main pthread
❺ 在linux環境中,如何實現多線程中使用多個定時器,POSIX定時器可以嗎,如何用
個局御辯人解決了,以下是一個實現:
#include <stdlib.h>
#include <unistd.h>
#include <stdio.h>
#include <signal.h>
#include <string.h>
#include <pthread.h>
#include <time.h>
#if 1
pthread_attr_t attr;
timer_t hard_timer, software_timer;
struct sigevent hard_evp, software_evp;
static void watchdog_hard_timeout(union sigval v)
{
time_t t;
char p[32];
timer_t *q;
struct itimerspec ts;
int ret;
time(&t);
strftime(p, sizeof(p), "%T", localtime(&t));
printf("watchdog hard timeout!\n");
printf("%s thread %d, val = %u, signal captured.\n", p, (unsigned int)pthread_self(), v.sival_int);
q = (timer_t *)(v.sival_ptr);
printf("hard timer_t:%d add:%p, q:%p!\n", (int)hard_timer, &hard_timer, q);
ts.it_interval.tv_sec = 0;
ts.it_interval.tv_nsec = 0;
ts.it_value.tv_sec = 6;
ts.it_value.tv_nsec = 0;
ret = timer_settime(*q, CLOCK_REALTIME, &ts, NULL);
if (ret != 0) {
printf("settime err(%d)!\n", ret);
}
}
static void watchdog_software_timeout(union sigval v)
{
time_t t;
char p[32];
timer_t *q;
struct itimerspec ts;
int ret;
time(&t);
strftime(p, sizeof(p), "%T", localtime(&t));
printf("watchdog software timeout!\n");
printf("%s thread %d, val = %u, signal captured.\n"桐缺, p, (unsigned int)pthread_self(), v.sival_int);
q = (timer_t *)(v.sival_ptr);
printf("hard timer_t:%d add:%p, q:%p!\拆讓n", (int)hard_timer, &hard_timer, q);
ts.it_interval.tv_sec = 0;
ts.it_interval.tv_nsec = 0;
ts.it_value.tv_sec = 10;
ts.it_value.tv_nsec = 0;
ret = timer_settime(*q, CLOCK_REALTIME, &ts, NULL);
if (ret != 0) {
printf("settime err(%d)!\n", ret);
}
}
static void dcmi_sol_pthread_attr_destroy(pthread_attr_t *attr)
{
pthread_attr_destroy(attr);
}
static int dcmi_sol_pthread_attr_init(pthread_attr_t *attr)
{
int ret;
if ((ret = pthread_attr_init(attr) != 0)) {
goto err;
}
if ((ret = pthread_attr_setdetachstate(attr, PTHREAD_CREATE_DETACHED)) != 0) {
dcmi_sol_pthread_attr_destroy(attr);
goto err;
}
/* 設置線程的棧大小,失敗則用系統默認值 */
pthread_attr_setstacksize(attr, 128 * 1024);
return 0;
err:
printf("set ptread attr failed(ret:%d)!\n", ret);
return -1;
}
int main(void)
{
struct itimerspec ts;
int ret;
ret = dcmi_sol_pthread_attr_init(&attr);
if (ret != 0) {
printf("init pthread attributes fail(%d)!\n", ret);
exit(-1);
}
memset(&hard_evp, 0, sizeof(struct sigevent));
hard_evp.sigev_value.sival_ptr = &hard_timer;
hard_evp.sigev_notify = SIGEV_THREAD;
hard_evp.sigev_notify_function = watchdog_hard_timeout;
hard_evp.sigev_notify_attributes = NULL;//&attr;
memset(&software_evp, 0, sizeof(struct sigevent));
software_evp.sigev_value.sival_ptr = &software_timer;
software_evp.sigev_notify = SIGEV_THREAD;
software_evp.sigev_notify_function = watchdog_software_timeout;
software_evp.sigev_notify_attributes = NULL;//&attr;
ret = timer_create(CLOCK_REALTIME, &hard_evp, &hard_timer);
if(ret != 0) {
perror("hard timer_create fail!");
exit(-1);
}
ret = timer_create(CLOCK_REALTIME, &software_evp, &software_timer);
if (ret != 0) {
timer_delete(hard_timer);
perror("software timer_create fail!");
exit(-1);
}
ts.it_interval.tv_sec = 0;
ts.it_interval.tv_nsec = 0;
ts.it_value.tv_sec = 6;
ts.it_value.tv_nsec = 0;
ret = timer_settime(hard_timer, CLOCK_REALTIME, &ts, NULL);
if(ret != 0) {
perror("hard timer_settime fail!");
timer_delete(hard_timer);
timer_delete(software_timer);
exit(-1);
}
ts.it_value.tv_sec = 10;
ret = timer_settime(software_timer, CLOCK_REALTIME, &ts, NULL);
if(ret != 0) {
perror("hard timer_settime fail!");
timer_delete(hard_timer);
timer_delete(software_timer);
exit(-1);
}
while(1) {
printf("main ready sleep!\n");
sleep(15);
printf("main sleep finish!\n");
}
return 0;
}
#endif
❻ Linux多線程編程
程序代碼test.c共兩個線程,一個主線程,一個讀緩存區的線程:
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
char globe_buffer[100];
void *read_buffer_thread(void *arg); //這里先聲明一下讀緩存的線程,具體實現寫在後面了
int main()
{
int res,i;
pthread_t read_thread;
for(i=0;i<20;i++)
globe_buffer[i]=i;
printf("\nTest thread : write buffer finish\n");
sleep(3);\\這里的3秒是多餘,可以不要。
res = pthread_create(&read_thread, NULL, read_buffer_thread, NULL);
if (res != 0)
{
printf("Read Thread creat Error!");
exit(0);
}
sleep(1);
printf("waiting for read thread to finish...\n");
res = pthread_join(read_thread, NULL);
if (res != 0)
{
printf("read thread join failed!\n");
exit(0);
}
printf("read thread test OK, have fun!! exit ByeBye\n");
return 0;
}
void *read_buffer_thread(void *arg)
{
int i,x;
printf("Read buffer thread read data : \n");
for(i=0;i<20;i++)
{
x=globe_buffer[i];
printf("%d ",x);
globe_buffer[i]=0;//清空
}
printf("\nread over\n");
}
---------------------------------------------------------------------------------
以上程序編譯:
gcc -D_REENTRANT test.c -o test.o –lpthread
運行這個程序:
$ ./test.o:
❼ 如何進行Linux下多線程的調試
方法一:PS
在ps命令中,「-T」選項可以開啟線程查看。下面的命令列出了由進程號為<pid>的進程創建的所有線程。
1.$ ps -T -p <pid>
「SID」欄表示線程ID,而「CMD」欄則顯示了線程名稱。
方法二: Top
top命令可以實時顯示各個線程情況。要在top輸出中開啟線程查看,請調用top命令的「-H」選項,該選項會列出所有Linux線程。在top運行時,你也可以通過按「H」鍵將線程查看模式切換為開或關。
1.$ top -H
要讓top輸出某個特定進程<pid>並檢查該進程內運行的線程狀況:
$ top -H -p <pid>
❽ linux 多線程環境下的幾種鎖機制
NO1
互斥量(Mutex)
互斥量是實現最簡單的鎖類型,因此有一些教科書一般以互斥量為例對鎖原語進行描述。互斥量的釋放並不僅僅依賴於釋放操作,還可以引入一個定皮返時器屬性。如果在釋放操作執行前發生定時器超時,則互斥量也會釋放代碼塊或共享存儲區供其他線程訪問。當有異常發生時,可使用try-finally語句來確保互斥量被釋放。定時器狀態或try-finally語句的使用可以避免產生死鎖。
遞歸鎖(Recursive
Lock)
遞歸鎖是指可以被當前持有該鎖的線程重復獲取,而不會導致該線程產生死鎖的鎖類型。對遞歸鎖而言,只有在當前持有線程的獲取鎖操作都有一個釋放操作與之對應時,其他線程才可以獲取該鎖。因此,在使用遞歸鎖時,必須要用足夠的釋放鎖操作來平衡獲取鎖操作,實現這一目標的最佳方式是在單入口單出口代碼塊的兩頭一一對應地使用獲取、釋放操作,做法和在普通鎖中一樣。遞歸鎖在遞歸函數中最有用。但是,總的來說,遞歸鎖比非遞歸鎖速度要慢。需要注意的是:調用線程獲得幾次遞歸鎖必須釋放幾次遞歸鎖。
以下為一個遞歸鎖的示例:
[cpp] view plain
Recursive_Lock L
void recursiveFunction (int count) {
L->acquire()
if (count > 0) {
count = count - 1;
recursiveFunction(count);
}
L->release();
}
讀寫鎖(Read-Write
lock) 讀寫鎖又稱為共享獨占鎖(shared-exclusive
lock)、多讀單寫鎖(multiple-read/single-write lock)或者非互斥信號量(non-mutual
exclusion
semaphore)。讀寫鎖允許多個線程同時進行讀訪問,但是在某一時刻卻最多隻能由一個線程執行寫操作。對於多個線程需要同時讀共享數據卻並不一定進行寫操作的應用來說,讀寫鎖是一種高效的同步機制。對於較長的共享數據,只為其設置一個讀寫鎖會導致較長的訪問時間,最好將其劃分為多個小段並設置多個讀寫鎖以進行同步。
這個讀寫鎖我們在學習資料庫的時候應該很熟悉的喲!
旋轉鎖(Spin
Lock)
旋轉鎖是一種非阻塞鎖,由某個線程獨占。腔指采伍握配用旋轉鎖時,等待線程並不靜態地阻塞在同步點,而是必須「旋轉」,不斷嘗試直到最終獲得該鎖。旋轉鎖多用於多處理器系統中。這是因為,如果在單核處理器中採用旋轉鎖,當一個線程正在「旋轉」時,將沒有執行資源可供另一釋放鎖的線程使用。旋轉鎖適合於任何鎖持有時間少於將一個線程阻塞和喚醒所需時間的場合。線程式控制制的變更,包括線程上下文的切換和線程數據結構的更新,可能比旋轉鎖需要更多的指令周期。旋轉鎖的持有時間應該限制在線程上下文切換時間的50%到100%之間(Kleiman,1996年)。在線程調用其他子系統時,線程不應持有旋轉鎖。對旋轉鎖的不當使用可能會導致線程餓死,因此需謹慎使用這種鎖機制。旋轉鎖導致的餓死問題可使用排隊技術來解決,即每個等待線程按照先進先出的順序或者隊列結構在一個獨立的局部標識上進行旋轉。
學習了這些,果然受益匪淺,在今後的coding中,我得挨個試試咯。
❾ 如何看懂《Linux多線程服務端編程
一:進程和線程
每個進程有自己獨立的地址空間。「在同一個進程」還是「不在同一個進程」是系統功能劃分的重要決策點。《Erlang程序設計》[ERL]把進程比喻為人:
每個人有自己的記憶(內存),人與人通過談話(消息傳遞)來交流,談話既可以是面談(同一台伺服器),也可以在電話里談(不同的伺服器,有網路通信)。面談和電話談的區別在於,面談可以立即知道對方是否死了(crash,SIGCHLD),而電話談只能通過周期性的心跳來判斷對方是否還活著。
有了這些比喻,設計分布式系統時可以採取「角色扮演」,團隊里的幾個人各自扮演一個進程,人的角色由進程的代碼決定(管登錄的、管消息分發的、管買賣的等等)。每個人有自己的記憶,但不知道別人的記憶,要想知道別人的看法,只能通過交談(暫不考慮共享內存這種IPC)。然後就可以思考:
·容錯:萬一有人突然死了
·擴容:新人中途加進來
·負載均衡:把甲的活兒挪給乙做
·退休:甲要修復bug,先別派新任務,等他做完手上的事情就把他重啟
等等各種場景,十分便利。
線程的特點是共享地址空間,從而可以高效地共享數據。一台機器上的多個進程能高效地共享代碼段(操作系統可以映射為同樣的物理內存),但不能共享數據。如果多個進程大量共享內存,等於是把多進程程序當成多線程來寫,掩耳盜鈴。
「多線程」的價值,我認為是為了更好地發揮多核處理器(multi-cores)的效能。在單核時代,多線程沒有多大價值(個人想法:如果要完成的任務是CPU密集型的,那多線程沒有優勢,甚至因為線程切換的開銷,多線程反而更慢;如果要完成的任務既有CPU計算,又有磁碟或網路IO,則使用多線程的好處是,當某個線程因為IO而阻塞時,OS可以調度其他線程執行,雖然效率確實要比任務的順序執行效率要高,然而,這種類型的任務,可以通過單線程的」non-blocking IO+IO multiplexing」的模型(事件驅動)來提高效率,採用多線程的方式,帶來的可能僅僅是編程上的簡單而已)。Alan Cox說過:」A computer is a state machine.Threads are for people who can』t program state machines.」(計算機是一台狀態機。線程是給那些不能編寫狀態機程序的人准備的)如果只有一塊CPU、一個執行單元,那麼確實如Alan Cox所說,按狀態機的思路去寫程序是最高效的。
二:單線程伺服器的常用編程模型
據我了解,在高性能的網路程序中,使用得最為廣泛的恐怕要數」non-blocking IO + IO multiplexing」這種模型,即Reactor模式。
在」non-blocking IO + IO multiplexing」這種模型中,程序的基本結構是一個事件循環(event loop),以事件驅動(event-driven)和事件回調的方式實現業務邏輯:
[cpp] view plain
//代碼僅為示意,沒有完整考慮各種情況
while(!done)
{
int timeout_ms = max(1000, getNextTimedCallback());
int retval = poll(fds, nfds, timeout_ms);
if (retval<0){
處理錯誤,回調用戶的error handler
}else{
處理到期的timers,回調用戶的timer handler
if(retval>0){
處理IO事件,回調用戶的IO event handler
}
}
}
這里select(2)/poll(2)有伸縮性方面的不足(描述符過多時,效率較低),Linux下可替換為epoll(4),其他操作系統也有對應的高性能替代品。
Reactor模型的優點很明顯,編程不難,效率也不錯。不僅可以用於讀寫socket,連接的建立(connect(2)/accept(2)),甚至DNS解析都可以用非阻塞方式進行,以提高並發度和吞吐量(throughput),對於IO密集的應用是個不錯的選擇。lighttpd就是這樣,它內部的fdevent結構十分精妙,值得學習。
基於事件驅動的編程模型也有其本質的缺點,它要求事件回調函數必須是非阻塞的。對於涉及網路IO的請求響應式協議,它容易割裂業務邏輯,使其散布於多個回調函數之中,相對不容易理解和維護。
三:多線程伺服器的常用編程模型
大概有這么幾種:
a:每個請求創建一個線程,使用阻塞式IO操作。在Java 1.4引人NIO之前,這是Java網路編程的推薦做法。可惜伸縮性不佳(請求太多時,操作系統創建不了這許多線程)。
b:使用線程池,同樣使用阻塞式IO操作。與第1種相比,這是提高性能的措施。
c:使用non-blocking IO + IO multiplexing。即Java NIO的方式。
d:Leader/Follower等高級模式。
在默認情況下,我會使用第3種,即non-blocking IO + one loop per thread模式來編寫多線程C++網路服務程序。
1:one loop per thread
此種模型下,程序里的每個IO線程有一個event loop,用於處理讀寫和定時事件(無論周期性的還是單次的)。代碼框架跟「單線程伺服器的常用編程模型」一節中的一樣。
libev的作者說:
One loop per thread is usually a good model. Doing this is almost never wrong, some times a better-performance model exists, but it is always a good start.
這種方式的好處是:
a:線程數目基本固定,可以在程序啟動的時候設置,不會頻繁創建與銷毀。
b:可以很方便地在線程間調配負載。
c:IO事件發生的線程是固定的,同一個TCP連接不必考慮事件並發。
Event loop代表了線程的主循環,需要讓哪個線程幹活,就把timer或IO channel(如TCP連接)注冊到哪個線程的loop里即可:對實時性有要求的connection可以單獨用一個線程;數據量大的connection可以獨佔一個線程,並把數據處理任務分攤到另幾個計算線程中(用線程池);其他次要的輔助性connections可以共享一個線程。
比如,在dbproxy中,一個線程用於專門處理客戶端發來的管理命令;一個線程用於處理客戶端發來的MySQL命令,而與後端資料庫通信執行該命令時,是將該任務分配給所有事件線程處理的。
對於non-trivial(有一定規模)的服務端程序,一般會採用non-blocking IO + IO multiplexing,每個connection/acceptor都會注冊到某個event loop上,程序里有多個event loop,每個線程至多有一個event loop。
多線程程序對event loop提出了更高的要求,那就是「線程安全」。要允許一個線程往別的線程的loop里塞東西,這個loop必須得是線程安全的。
在dbproxy中,線程向其他線程分發任務,是通過管道和隊列實現的。比如主線程accept到連接後,將表示該連接的結構放入隊列,並向管道中寫入一個位元組。計算線程在自己的event loop中注冊管道的讀事件,一旦有數據可讀,就嘗試從隊列中取任務。
2:線程池
不過,對於沒有IO而光有計算任務的線程,使用event loop有點浪費。可以使用一種補充方案,即用blocking queue實現的任務隊列:
[cpp] view plain
typedef boost::function<void()>Functor;
BlockingQueue<Functor> taskQueue; //線程安全的全局阻塞隊列
//計算線程
void workerThread()
{
while (running) //running變數是個全局標志
{
Functor task = taskQueue.take(); //this blocks
task(); //在產品代碼中需要考慮異常處理
}
}
// 創建容量(並發數)為N的線程池
int N = num_of_computing_threads;
for (int i = 0; i < N; ++i)
{
create_thread(&workerThread); //啟動線程
}
//向任務隊列中追加任務
Foo foo; //Foo有calc()成員函數
boost::function<void()> task = boost::bind(&Foo::calc,&foo);
taskQueue.post(task);
除了任務隊列,還可以用BlockingQueue<T>實現數據的生產者消費者隊列,即T是數據類型而非函數對象,queue的消費者從中拿到數據進行處理。其實本質上是一樣的。
3:總結
總結而言,我推薦的C++多線程服務端編程模式為:one (event) loop per thread + thread pool:
event loop用作IO multiplexing,配合non-blockingIO和定時器;
thread pool用來做計算,具體可以是任務隊列或生產者消費者隊列。
以這種方式寫伺服器程序,需要一個優質的基於Reactor模式的網路庫來支撐,muo正是這樣的網路庫。比如dbproxy使用的是libevent。
程序里具體用幾個loop、線程池的大小等參數需要根據應用來設定,基本的原則是「阻抗匹配」(解釋見下),使得CPU和IO都能高效地運作。所謂阻抗匹配原則:
如果池中線程在執行任務時,密集計算所佔的時間比重為 P (0 < P <= 1),而系統一共有 C 個 CPU,為了讓這 C 個 CPU 跑滿而又不過載,線程池大小的經驗公式 T = C/P。(T 是個 hint,考慮到 P 值的估計不是很准確,T 的最佳值可以上下浮動 50%)
以後我再講這個經驗公式是怎麼來的,先驗證邊界條件的正確性。
假設 C = 8,P = 1.0,線程池的任務完全是密集計算,那麼T = 8。只要 8 個活動線程就能讓 8 個 CPU 飽和,再多也沒用,因為 CPU 資源已經耗光了。
假設 C = 8,P = 0.5,線程池的任務有一半是計算,有一半等在 IO 上,那麼T = 16。考慮操作系統能靈活合理地調度 sleeping/writing/running 線程,那麼大概 16 個「50%繁忙的線程」能讓 8 個 CPU 忙個不停。啟動更多的線程並不能提高吞吐量,反而因為增加上下文切換的開銷而降低性能。
如果 P < 0.2,這個公式就不適用了,T 可以取一個固定值,比如 5*C。
另外,公式里的 C 不一定是 CPU 總數,可以是「分配給這項任務的 CPU 數目」,比如在 8 核機器上分出 4 個核來做一項任務,那麼 C=4。
四:進程間通信只用TCP
Linux下進程間通信的方式有:匿名管道(pipe)、具名管道(FIFO)、POSIX消息隊列、共享內存、信號(signals),以及Socket。同步原語有互斥器(mutex)、條件變數(condition variable)、讀寫鎖(reader-writer lock)、文件鎖(record locking)、信號量(semaphore)等等。
進程間通信我首選Sockets(主要指TCP,我沒有用過UDP,也不考慮Unix domain協議)。其好處在於:
可以跨主機,具有伸縮性。反正都是多進程了,如果一台機器的處理能力不夠,很自然地就能用多台機器來處理。把進程分散到同一區域網的多台機器上,程序改改host:port配置就能繼續用;
TCP sockets和pipe都是操作文件描述符,用來收發位元組流,都可以read/write/fcntl/select/poll等。不同的是,TCP是雙向的,Linux的pipe是單向的,進程間雙向通信還得開兩個文件描述符,不方便;而且進程要有父子關系才能用pipe,這些都限制了pipe的使用;
TCP port由一個進程獨占,且進程退出時操作系統會自動回收文件描述符。因此即使程序意外退出,也不會給系統留下垃圾,程序重啟之後能比較容易地恢復,而不需要重啟操作系統(用跨進程的mutex就有這個風險);而且,port是獨占的,可以防止程序重復啟動,後面那個進程搶不到port,自然就沒法初始化了,避免造成意料之外的結果;
與其他IPC相比,TCP協議的一個天生的好處是「可記錄、可重現」。tcpmp和Wireshark是解決兩個進程間協議和狀態爭端的好幫手,也是性能(吞吐量、延遲)分析的利器。我們可以藉此編寫分布式程序的自動化回歸測試。也可以用tcp之類的工具進行壓力測試。TCP還能跨語言,服務端和客戶端不必使用同一種語言。
分布式系統的軟體設計和功能劃分一般應該以「進程」為單位。從宏觀上看,一個分布式系統是由運行在多台機器上的多個進程組成的,進程之間採用TCP長連接通信。
使用TCP長連接的好處有兩點:一是容易定位分布式系統中的服務之間的依賴關系。只要在機器上運行netstat -tpna|grep <port>就能立刻列出用到某服務的客戶端地址(Foreign Address列),然後在客戶端的機器上用netstat或lsof命令找出是哪個進程發起的連接。TCP短連接和UDP則不具備這一特性。二是通過接收和發送隊列的長度也較容易定位網路或程序故障。在正常運行的時候,netstat列印的Recv-Q和Send-Q都應該接近0,或者在0附近擺動。如果Recv-Q保持不變或持續增加,則通常意味著服務進程的處理速度變慢,可能發生了死鎖或阻塞。如果Send-Q保持不變或持續增加,有可能是對方伺服器太忙、來不及處理,也有可能是網路中間某個路由器或交換機故障造成丟包,甚至對方伺服器掉線,這些因素都可能表現為數據發送不出去。通過持續監控Recv-Q和Send-Q就能及早預警性能或可用性故障。以下是服務端線程阻塞造成Recv-Q和客戶端Send-Q激增的例子:
[cpp] view plain
$netstat -tn
Proto Recv-Q Send-Q Local Address Foreign
tcp 78393 0 10.0.0.10:2000 10.0.0.10:39748 #服務端連接
tcp 0 132608 10.0.0.10:39748 10.0.0.10:2000 #客戶端連接
tcp 0 52 10.0.0.10:22 10.0.0.4:55572
五:多線程伺服器的適用場合
如果要在一台多核機器上提供一種服務或執行一個任務,可用的模式有:
a:運行一個單線程的進程;
b:運行一個多線程的進程;
c:運行多個單線程的進程;
d:運行多個多線程的進程;
考慮這樣的場景:如果使用速率為50MB/s的數據壓縮庫,進程創建銷毀的開銷是800微秒,線程創建銷毀的開銷是50微秒。如何執行壓縮任務?
如果要偶爾壓縮1GB的文本文件,預計運行時間是20s,那麼起一個進程去做是合理的,因為進程啟動和銷毀的開銷遠遠小於實際任務的耗時。
如果要經常壓縮500kB的文本數據,預計運行時間是10ms,那麼每次都起進程 似乎有點浪費了,可以每次單獨起一個線程去做。
如果要頻繁壓縮10kB的文本數據,預計運行時間是200微秒,那麼每次起線程似 乎也很浪費,不如直接在當前線程搞定。也可以用一個線程池,每次把壓縮任務交給線程池,避免阻塞當前線程(特別要避免阻塞IO線程)。
由此可見,多線程並不是萬靈丹(silver bullet)。
1:必須使用單線程的場合
據我所知,有兩種場合必須使用單線程:
a:程序可能會fork(2);
實際編程中,應該保證只有單線程程序能進行fork(2)。多線程程序不是不能調用fork(2),而是這么做會遇到很多麻煩:
fork一般不能在多線程程序中調用,因為Linux的fork只克隆當前線程的thread of control,不可隆其他線程。fork之後,除了當前線程之外,其他線程都消失了。
這就造成一種危險的局面。其他線程可能正好處於臨界區之內,持有了某個鎖,而它突然死亡,再也沒有機會去解鎖了。此時如果子進程試圖再對同一個mutex加鎖,就會立即死鎖。因此,fork之後,子進程就相當於處於signal handler之中(因為不知道調用fork時,父進程中的線程此時正在調用什麼函數,這和信號發生時的場景一樣),你不能調用線程安全的函數(除非它是可重入的),而只能調用非同步信號安全的函數。比如,fork之後,子進程不能調用:
malloc,因為malloc在訪問全局狀態時幾乎肯定會加鎖;
任何可能分配或釋放內存的函數,比如snprintf;
任何Pthreads函數;
printf系列函數,因為其他線程可能恰好持有stdout/stderr的鎖;
除了man 7 signal中明確列出的信號安全函數之外的任何函數。
因此,多線程中調用fork,唯一安全的做法是fork之後,立即調用exec執行另一個程序,徹底隔斷子進程與父進程的聯系。
在多線程環境中調用fork,產生子進程後。子進程內部只存在一個線程,也就是父進程中調用fork的線程的副本。
使用fork創建子進程時,子進程通過繼承整個地址空間的副本,也從父進程那裡繼承了所有互斥量、讀寫鎖和條件變數的狀態。如果父進程中的某個線程佔有鎖,則子進程同樣佔有這些鎖。問題是子進程並不包含佔有鎖的線程的副本,所以子進程沒有辦法知道它佔有了哪些鎖,並且需要釋放哪些鎖。
盡管Pthread提供了pthread_atfork函數試圖繞過這樣的問題,但是這回使得代碼變得混亂。因此《Programming With Posix Threads》一書的作者說:」Avoid using fork in threaded code except where the child process will immediately exec a new program.」。
b:限製程序的CPU佔用率;
這個很容易理解,比如在一個8核的伺服器上,一個單線程程序即便發生busy-wait,占滿1個core,其CPU使用率也只有12.5%,在這種最壞的情況下,系統還是有87.5%的計算資源可供其他服務進程使用。
因此對於一些輔助性的程序,如果它必須和主要服務進程運行在同一台機器的話,那麼做成單線程的能避免過分搶奪系統的計算資源。
❿ linux如何實現多線程
#/bin/bashall_num=10a=$(date +%H%M%S)for num in `seq 1 ${all_num}`do
sleep 1
echo ${num}
done
b=$(date +%H%M%S)
echo -e "startTime:\t$a"echo -e "endTime:\t$b"