① 即時通訊軟體開發的網路編程方式有哪些
引言、即時通訊是網上最為流行的通訊方式,市場上也出現了各種各樣的即時通訊軟體。這篇文章將會給大家介紹一些開發即時通訊軟體的網路編程方式。
開發即時通訊軟體需要用到安卓端技術java語言,蘋果端oc語言,電腦端win系統桌面C/C++語言,管理後台資料庫語言,後台管理界面java或者php。建議可以使用第三方SDK,可以有效地避免消息漏發,卡頓,數據並發等很多問題,提高了用戶對產品的體驗感。
三、如何設置編程。
mysql資料庫的用戶名為root,密碼為空,可以根據自己的需要設置相應的用戶名和密碼(固定在程序中)。mychatserver是聊天伺服器,myfileserver是文件伺服器,文件伺服器負責上傳和下載聊天中發送的文件,myimgserver負責上傳和下載聊天中的圖片。三個服務之間相互獨立,不會互相影響。聊天伺服器監聽埠是20000,文件伺服器埠是20001,圖片伺服器埠號是20002,這三個埠的客戶端連接,其中聊天埠和客戶端是長連接,文件埠和圖片可選擇長連接或短連接。第一次運行mychatserver時,如果能順利連上mysql,mychatserver會自動檢測是否存在名為myim的資料庫,如果不存在就可以創建了,並新建三張信息表,分別是用戶信息表,好友關系表和聊天消息記錄表。第一次啟動文件伺服器時會創建filecache目錄,這個目錄用來存儲聊天中的聊天圖片和離線文件以及客戶端的升級包。為了方便查看代碼,可以用Visual Studio管理代碼,使用VS打開myserver.sln查看和管理代碼。
② 如何看懂《linux多線程服務端編程
一:進程和線程
每個進程有自己獨立的地址空間。「在同一個進程」還是「不在同一個進程」是系統功能劃分的重要決策點。《Erlang程序設計》[ERL]把進程比喻為人:
每個人有自己的記憶(內存),人與人通過談話(消息傳遞)來交流,談話既可以是面談(同一台伺服器),也可以在電話里談(不同的伺服器,有網路通信)。面談和電話談的區別在於,面談可以立即知道對方是否死了(crash,SIGCHLD),而電話談只能通過周期性的心跳來判斷對方是否還活著。
有了這些比喻,設計分布式系統時可以採取「角色扮演」,團隊里的幾個人各自扮演一個進程,人的角色由進程的代碼決定(管登錄的、管消息分發的、管買賣的等等)。每個人有自己的記憶,但不知道別人的記憶,要想知道別人的看法,只能通過交談(暫不考慮共享內存這種IPC)。然後就可以思考:
·容錯:萬一有人突然死了
·擴容:新人中途加進來
·負載均衡:把甲的活兒挪給乙做
·退休:甲要修復bug,先別派新任務,等他做完手上的事情就把他重啟
等等各種場景,十分便利。
線程的特點是共享地址空間,從而可以高效地共享數據。一台機器上的多個進程能高效地共享代碼段(操作系統可以映射為同樣的物理內存),但不能共享數據。如果多個進程大量共享內存,等於是把多進程程序當成多線程來寫,掩耳盜鈴。
「多線程」的價值,我認為是為了更好地發揮多核處理器(multi-cores)的效能。在單核時代,多線程沒有多大價值(個人想法:如果要完成的任務是CPU密集型的,那多線程沒有優勢,甚至因為線程切換的開銷,多線程反而更慢;如果要完成的任務既有CPU計算,又有磁碟或網路IO,則使用多線程的好處是,當某個線程因為IO而阻塞時,OS可以調度其他線程執行,雖然效率確實要比任務的順序執行效率要高,然而,這種類型的任務,可以通過單線程的」non-blocking IO+IO multiplexing」的模型(事件驅動)來提高效率,採用多線程的方式,帶來的可能僅僅是編程上的簡單而已)。Alan Cox說過:」A computer is a state machine.Threads are for people who can』t program state machines.」(計算機是一台狀態機。線程是給那些不能編寫狀態機程序的人准備的)如果只有一塊CPU、一個執行單元,那麼確實如Alan Cox所說,按狀態機的思路去寫程序是最高效的。
二:單線程伺服器的常用編程模型
據我了解,在高性能的網路程序中,使用得最為廣泛的恐怕要數」non-blocking IO + IO multiplexing」這種模型,即Reactor模式。
在」non-blocking IO + IO multiplexing」這種模型中,程序的基本結構是一個事件循環(event loop),以事件驅動(event-driven)和事件回調的方式實現業務邏輯:
[cpp] view plain
//代碼僅為示意,沒有完整考慮各種情況
while(!done)
{
int timeout_ms = max(1000, getNextTimedCallback());
int retval = poll(fds, nfds, timeout_ms);
if (retval<0){
處理錯誤,回調用戶的error handler
}else{
處理到期的timers,回調用戶的timer handler
if(retval>0){
處理IO事件,回調用戶的IO event handler
}
}
}
這里select(2)/poll(2)有伸縮性方面的不足(描述符過多時,效率較低),Linux下可替換為epoll(4),其他操作系統也有對應的高性能替代品。
Reactor模型的優點很明顯,編程不難,效率也不錯。不僅可以用於讀寫socket,連接的建立(connect(2)/accept(2)),甚至DNS解析都可以用非阻塞方式進行,以提高並發度和吞吐量(throughput),對於IO密集的應用是個不錯的選擇。lighttpd就是這樣,它內部的fdevent結構十分精妙,值得學習。
基於事件驅動的編程模型也有其本質的缺點,它要求事件回調函數必須是非阻塞的。對於涉及網路IO的請求響應式協議,它容易割裂業務邏輯,使其散布於多個回調函數之中,相對不容易理解和維護。
三:多線程伺服器的常用編程模型
大概有這么幾種:
a:每個請求創建一個線程,使用阻塞式IO操作。在Java 1.4引人NIO之前,這是Java網路編程的推薦做法。可惜伸縮性不佳(請求太多時,操作系統創建不了這許多線程)。
b:使用線程池,同樣使用阻塞式IO操作。與第1種相比,這是提高性能的措施。
c:使用non-blocking IO + IO multiplexing。即Java NIO的方式。
d:Leader/Follower等高級模式。
在默認情況下,我會使用第3種,即non-blocking IO + one loop per thread模式來編寫多線程C++網路服務程序。
1:one loop per thread
此種模型下,程序里的每個IO線程有一個event loop,用於處理讀寫和定時事件(無論周期性的還是單次的)。代碼框架跟「單線程伺服器的常用編程模型」一節中的一樣。
libev的作者說:
One loop per thread is usually a good model. Doing this is almost never wrong, some times a better-performance model exists, but it is always a good start.
這種方式的好處是:
a:線程數目基本固定,可以在程序啟動的時候設置,不會頻繁創建與銷毀。
b:可以很方便地在線程間調配負載。
c:IO事件發生的線程是固定的,同一個TCP連接不必考慮事件並發。
Event loop代表了線程的主循環,需要讓哪個線程幹活,就把timer或IO channel(如TCP連接)注冊到哪個線程的loop里即可:對實時性有要求的connection可以單獨用一個線程;數據量大的connection可以獨佔一個線程,並把數據處理任務分攤到另幾個計算線程中(用線程池);其他次要的輔助性connections可以共享一個線程。
比如,在dbproxy中,一個線程用於專門處理客戶端發來的管理命令;一個線程用於處理客戶端發來的MySQL命令,而與後端資料庫通信執行該命令時,是將該任務分配給所有事件線程處理的。
對於non-trivial(有一定規模)的服務端程序,一般會採用non-blocking IO + IO multiplexing,每個connection/acceptor都會注冊到某個event loop上,程序里有多個event loop,每個線程至多有一個event loop。
多線程程序對event loop提出了更高的要求,那就是「線程安全」。要允許一個線程往別的線程的loop里塞東西,這個loop必須得是線程安全的。
在dbproxy中,線程向其他線程分發任務,是通過管道和隊列實現的。比如主線程accept到連接後,將表示該連接的結構放入隊列,並向管道中寫入一個位元組。計算線程在自己的event loop中注冊管道的讀事件,一旦有數據可讀,就嘗試從隊列中取任務。
2:線程池
不過,對於沒有IO而光有計算任務的線程,使用event loop有點浪費。可以使用一種補充方案,即用blocking queue實現的任務隊列:
[cpp] view plain
typedef boost::function<void()>Functor;
BlockingQueue<Functor> taskQueue; //線程安全的全局阻塞隊列
//計算線程
void workerThread()
{
while (running) //running變數是個全局標志
{
Functor task = taskQueue.take(); //this blocks
task(); //在產品代碼中需要考慮異常處理
}
}
// 創建容量(並發數)為N的線程池
int N = num_of_computing_threads;
for (int i = 0; i < N; ++i)
{
create_thread(&workerThread); //啟動線程
}
//向任務隊列中追加任務
Foo foo; //Foo有calc()成員函數
boost::function<void()> task = boost::bind(&Foo::calc,&foo);
taskQueue.post(task);
除了任務隊列,還可以用BlockingQueue<T>實現數據的生產者消費者隊列,即T是數據類型而非函數對象,queue的消費者從中拿到數據進行處理。其實本質上是一樣的。
3:總結
總結而言,我推薦的C++多線程服務端編程模式為:one (event) loop per thread + thread pool:
event loop用作IO multiplexing,配合non-blockingIO和定時器;
thread pool用來做計算,具體可以是任務隊列或生產者消費者隊列。
以這種方式寫伺服器程序,需要一個優質的基於Reactor模式的網路庫來支撐,muo正是這樣的網路庫。比如dbproxy使用的是libevent。
程序里具體用幾個loop、線程池的大小等參數需要根據應用來設定,基本的原則是「阻抗匹配」(解釋見下),使得CPU和IO都能高效地運作。所謂阻抗匹配原則:
如果池中線程在執行任務時,密集計算所佔的時間比重為 P (0 < P <= 1),而系統一共有 C 個 CPU,為了讓這 C 個 CPU 跑滿而又不過載,線程池大小的經驗公式 T = C/P。(T 是個 hint,考慮到 P 值的估計不是很准確,T 的最佳值可以上下浮動 50%)
以後我再講這個經驗公式是怎麼來的,先驗證邊界條件的正確性。
假設 C = 8,P = 1.0,線程池的任務完全是密集計算,那麼T = 8。只要 8 個活動線程就能讓 8 個 CPU 飽和,再多也沒用,因為 CPU 資源已經耗光了。
假設 C = 8,P = 0.5,線程池的任務有一半是計算,有一半等在 IO 上,那麼T = 16。考慮操作系統能靈活合理地調度 sleeping/writing/running 線程,那麼大概 16 個「50%繁忙的線程」能讓 8 個 CPU 忙個不停。啟動更多的線程並不能提高吞吐量,反而因為增加上下文切換的開銷而降低性能。
如果 P < 0.2,這個公式就不適用了,T 可以取一個固定值,比如 5*C。
另外,公式里的 C 不一定是 CPU 總數,可以是「分配給這項任務的 CPU 數目」,比如在 8 核機器上分出 4 個核來做一項任務,那麼 C=4。
四:進程間通信只用TCP
Linux下進程間通信的方式有:匿名管道(pipe)、具名管道(FIFO)、POSIX消息隊列、共享內存、信號(signals),以及Socket。同步原語有互斥器(mutex)、條件變數(condition variable)、讀寫鎖(reader-writer lock)、文件鎖(record locking)、信號量(semaphore)等等。
進程間通信我首選Sockets(主要指TCP,我沒有用過UDP,也不考慮Unix domain協議)。其好處在於:
可以跨主機,具有伸縮性。反正都是多進程了,如果一台機器的處理能力不夠,很自然地就能用多台機器來處理。把進程分散到同一區域網的多台機器上,程序改改host:port配置就能繼續用;
TCP sockets和pipe都是操作文件描述符,用來收發位元組流,都可以read/write/fcntl/select/poll等。不同的是,TCP是雙向的,Linux的pipe是單向的,進程間雙向通信還得開兩個文件描述符,不方便;而且進程要有父子關系才能用pipe,這些都限制了pipe的使用;
TCP port由一個進程獨占,且進程退出時操作系統會自動回收文件描述符。因此即使程序意外退出,也不會給系統留下垃圾,程序重啟之後能比較容易地恢復,而不需要重啟操作系統(用跨進程的mutex就有這個風險);而且,port是獨占的,可以防止程序重復啟動,後面那個進程搶不到port,自然就沒法初始化了,避免造成意料之外的結果;
與其他IPC相比,TCP協議的一個天生的好處是「可記錄、可重現」。tcpmp和Wireshark是解決兩個進程間協議和狀態爭端的好幫手,也是性能(吞吐量、延遲)分析的利器。我們可以藉此編寫分布式程序的自動化回歸測試。也可以用tcp之類的工具進行壓力測試。TCP還能跨語言,服務端和客戶端不必使用同一種語言。
分布式系統的軟體設計和功能劃分一般應該以「進程」為單位。從宏觀上看,一個分布式系統是由運行在多台機器上的多個進程組成的,進程之間採用TCP長連接通信。
使用TCP長連接的好處有兩點:一是容易定位分布式系統中的服務之間的依賴關系。只要在機器上運行netstat -tpna|grep <port>就能立刻列出用到某服務的客戶端地址(Foreign Address列),然後在客戶端的機器上用netstat或lsof命令找出是哪個進程發起的連接。TCP短連接和UDP則不具備這一特性。二是通過接收和發送隊列的長度也較容易定位網路或程序故障。在正常運行的時候,netstat列印的Recv-Q和Send-Q都應該接近0,或者在0附近擺動。如果Recv-Q保持不變或持續增加,則通常意味著服務進程的處理速度變慢,可能發生了死鎖或阻塞。如果Send-Q保持不變或持續增加,有可能是對方伺服器太忙、來不及處理,也有可能是網路中間某個路由器或交換機故障造成丟包,甚至對方伺服器掉線,這些因素都可能表現為數據發送不出去。通過持續監控Recv-Q和Send-Q就能及早預警性能或可用性故障。以下是服務端線程阻塞造成Recv-Q和客戶端Send-Q激增的例子:
[cpp] view plain
$netstat -tn
Proto Recv-Q Send-Q Local Address Foreign
tcp 78393 0 10.0.0.10:2000 10.0.0.10:39748 #服務端連接
tcp 0 132608 10.0.0.10:39748 10.0.0.10:2000 #客戶端連接
tcp 0 52 10.0.0.10:22 10.0.0.4:55572
五:多線程伺服器的適用場合
如果要在一台多核機器上提供一種服務或執行一個任務,可用的模式有:
a:運行一個單線程的進程;
b:運行一個多線程的進程;
c:運行多個單線程的進程;
d:運行多個多線程的進程;
考慮這樣的場景:如果使用速率為50MB/s的數據壓縮庫,進程創建銷毀的開銷是800微秒,線程創建銷毀的開銷是50微秒。如何執行壓縮任務?
如果要偶爾壓縮1GB的文本文件,預計運行時間是20s,那麼起一個進程去做是合理的,因為進程啟動和銷毀的開銷遠遠小於實際任務的耗時。
如果要經常壓縮500kB的文本數據,預計運行時間是10ms,那麼每次都起進程 似乎有點浪費了,可以每次單獨起一個線程去做。
如果要頻繁壓縮10kB的文本數據,預計運行時間是200微秒,那麼每次起線程似 乎也很浪費,不如直接在當前線程搞定。也可以用一個線程池,每次把壓縮任務交給線程池,避免阻塞當前線程(特別要避免阻塞IO線程)。
由此可見,多線程並不是萬靈丹(silver bullet)。
1:必須使用單線程的場合
據我所知,有兩種場合必須使用單線程:
a:程序可能會fork(2);
實際編程中,應該保證只有單線程程序能進行fork(2)。多線程程序不是不能調用fork(2),而是這么做會遇到很多麻煩:
fork一般不能在多線程程序中調用,因為Linux的fork只克隆當前線程的thread of control,不可隆其他線程。fork之後,除了當前線程之外,其他線程都消失了。
這就造成一種危險的局面。其他線程可能正好處於臨界區之內,持有了某個鎖,而它突然死亡,再也沒有機會去解鎖了。此時如果子進程試圖再對同一個mutex加鎖,就會立即死鎖。因此,fork之後,子進程就相當於處於signal handler之中(因為不知道調用fork時,父進程中的線程此時正在調用什麼函數,這和信號發生時的場景一樣),你不能調用線程安全的函數(除非它是可重入的),而只能調用非同步信號安全的函數。比如,fork之後,子進程不能調用:
malloc,因為malloc在訪問全局狀態時幾乎肯定會加鎖;
任何可能分配或釋放內存的函數,比如snprintf;
任何Pthreads函數;
printf系列函數,因為其他線程可能恰好持有stdout/stderr的鎖;
除了man 7 signal中明確列出的信號安全函數之外的任何函數。
因此,多線程中調用fork,唯一安全的做法是fork之後,立即調用exec執行另一個程序,徹底隔斷子進程與父進程的聯系。
在多線程環境中調用fork,產生子進程後。子進程內部只存在一個線程,也就是父進程中調用fork的線程的副本。
使用fork創建子進程時,子進程通過繼承整個地址空間的副本,也從父進程那裡繼承了所有互斥量、讀寫鎖和條件變數的狀態。如果父進程中的某個線程佔有鎖,則子進程同樣佔有這些鎖。問題是子進程並不包含佔有鎖的線程的副本,所以子進程沒有辦法知道它佔有了哪些鎖,並且需要釋放哪些鎖。
盡管Pthread提供了pthread_atfork函數試圖繞過這樣的問題,但是這回使得代碼變得混亂。因此《Programming With Posix Threads》一書的作者說:」Avoid using fork in threaded code except where the child process will immediately exec a new program.」。
b:限製程序的CPU佔用率;
這個很容易理解,比如在一個8核的伺服器上,一個單線程程序即便發生busy-wait,占滿1個core,其CPU使用率也只有12.5%,在這種最壞的情況下,系統還是有87.5%的計算資源可供其他服務進程使用。
因此對於一些輔助性的程序,如果它必須和主要服務進程運行在同一台機器的話,那麼做成單線程的能避免過分搶奪系統的計算資源。
③ java Netty NIO 如何突破 65536 個埠的限制如何做到 10萬~50萬 長連接
首先說一下 伺服器是只監聽一個埠,所有的客戶端連接,都是連接到伺服器的同一個埠上的。也就是說伺服器只是用了一個埠。就比如Http伺服器。默認只用了80埠。
這是解答一些人的這個疑惑。
下面來回答你的問題
nio 在linux上使用的是epoll ,epoll支持在一個進程中打開的FD是操作系統最大文件句柄數,而不是你所說的16位short表示的文件句柄。 而 select模型 單進程打開的FD是受限的 select模型默認FD是1024 。操作系統最大文件句柄數跟內存有關,1GB內存的機器上,大概是10萬個句柄左右。可以通過cat /proc/sys/fs/file-max 查看
這個可以在Netty權威指南第二版的第七頁看到。
我ubuntu虛擬機,2G內存。結果是 200496
2019/05/09 修正一下上面讓人誤會的地方
「伺服器是只監聽一個埠」 這句話 請參照這一段的最後一行 「就比如Http伺服器默認只用了80埠」 我這一段話里說的伺服器並不是指伺服器主機 硬體, 而是說 服務程序。 一個伺服器主機操作系統上 可以運行很多服務程序, 而通常都會說 Netty伺服器、Apache伺服器、tomcat伺服器、Mysql伺服器 , 這里是指 Netty服務端 Apache服務端 tomcat服務端 Mysql服務端 。 再比如 一個游戲的登錄伺服器 沒人會叫他 游戲Netty服務程序 或者Netty登錄服務程序 , 而會稱呼它是 Netty伺服器或者登錄伺服器 或者xxx游戲登錄伺服器之類的。 只是依照行業術語來說的 被誤會了很抱歉 這里解釋一下 。
再次回答一下這個問題 Netty NIO不用突破65536個埠限制 因為根本沒有這個埠限制問題 只有主動發起一個請求 才會佔用一個本地埠 主動發起10個請求 會佔用10個本地埠 我這里說的是長連接 Netty NIO是屬於服務程序 他只需要監聽一個埠 比如8000埠 這時候有10個客戶端 連接到這個Netty伺服器 都是10個客戶端全都連接到伺服器的8000埠 服務端只會佔用8000埠這一個埠 所以不需要突破65536埠限制
④ socket高並發網路編程服務端有什麼框架
netty;
PayServer.java
package com.miri.pay.scoket;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class PayServer implements Runnable
{
private static final Logger DLOG = LoggerFactory.getLogger(PayServer.class);
private final int port;
public PayServer(int port)
{
this.port = port;
}
/**
* 為ServerBootstrap綁定指定埠
*/
public void run()
{
// 用於接收發來的連接請求
final EventLoopGroup bossGroup = new NioEventLoopGroup();
// 用於處理boss接受並且注冊給worker的連接中的信息
final EventLoopGroup workerGroup = new NioEventLoopGroup();
try
{
// 配置伺服器
final ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup);
bootstrap.channel(NioServerSocketChannel.class);
bootstrap.option(ChannelOption.SO_BACKLOG, 128);
// 通過NoDelay禁用Nagle,使消息立即發出去,不用等待到一定的數據量才發出去
bootstrap.option(ChannelOption.TCP_NODELAY, true);
// 保持長連接狀態
bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
// CustomChannelInitializer是一個特殊的handler,用於方便的配置用戶自定義的handler實現
bootstrap.childHandler(new CustomChannelInitializer());
// 綁定並開始接受傳入的連接
final ChannelFuture future = bootstrap.bind(this.port).sync();
if (future.isSuccess())
{
PayServer.DLOG.info("Start the socket server {} success", this.port);
}
else
{
PayServer.DLOG.info("Start the socket server {} failure,System exit!", this.port);
throw new RuntimeException("Socket服務端啟動失敗");
}
// 等待伺服器套接字關閉
// 關閉伺服器
future.channel().closeFuture().sync();
}
catch (final InterruptedException e)
{
PayServer.DLOG.error("Close the socket server exception occurs,System exit!", e);
throw new RuntimeException("關閉Socket服務端失敗");
}
finally
{
// 關閉所有事件循環終止線程
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
/**
* 特殊的內部類
* <p>
* 是一個特殊的handler,用於方便的配置用戶自定義的handler實現
* @author xulonghui
*/
static class CustomChannelInitializer extends ChannelInitializer<SocketChannel>
{
@Override
protected void initChannel(SocketChannel ch) throws Exception
{
final ChannelPipeline p = ch.pipeline();
p.addLast(new PayMessageEncoder());
p.addLast(new PayMessageDecoder());
p.addLast(new PayServerHandler());
}
}
}
PayMessageEncoder.java
package com.miri.pay.scoket;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import io.netty.util.CharsetUtil;
import com.miri.pay.model.CommonResponse;
import com.miri.pay.utils.JsonUtil;
/**
*消息編碼器
* <p>
* 編碼從服務端發送出的消息
*/
public class PayMessageEncoder extends MessageToByteEncoder<CommonResponse>
{
@Override
protected void encode(ChannelHandlerContext ctx, CommonResponse rsp, ByteBuf out) throws Exception
{
if (rsp != null)
{
final Object msgContent = rsp.getMsgContent();
// 消息ID,sequenceId和entityId三個加起來是12個長度
int msgLen = 12;
byte[] contentbs = new byte[] {};
if (msgContent != null)
{
final String content = JsonUtil.bean2json(msgContent);
contentbs = content.getBytes(CharsetUtil.UTF_8);
final int cl = contentbs.length;
msgLen += cl;
}
out.writeInt(msgLen);// 寫入當前消息的總長度
out.writeInt(rsp.getMsgId());// 寫入當前消息的消息ID
out.writeInt(rsp.getSequenceId());// 寫入當前消息的SequenceId
out.writeInt(rsp.getEntityId());// 寫入當前消息的EntityId
// 寫入消息主體內容
if (contentbs.length > 0)
{
out.writeBytes(contentbs);
}
}
}
}
PayMessageDecoder.java
package com.miri.pay.scoket;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.util.CharsetUtil;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.miri.pay.constants.Constants;
import com.miri.pay.model.CommonRequest;
import com.miri.pay.utils.ByteUtil;
/**
* 消息解碼器
* <p>
* 解碼從客戶端請求的消息
*/
public class PayMessageDecoder extends ByteToMessageDecoder
{
private static final Logger DLOG = LoggerFactory.getLogger(PayMessageDecoder.class);
/**
* 表示頭長度的位元組數
*/
private static final int HEAD_LENGTH = 4;
/**
* 所有ID串所屬的位元組數
*/
private static final int ID_STR_LENGTH = 12;
/**
* 單個ID所屬的位元組數
*/
private static final int SINGLE_ID_LENGTH = 4;
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception
{
int readable = in.readableBytes();
if (readable < PayMessageDecoder.HEAD_LENGTH)
{
return;
}
in.markReaderIndex(); // 我們標記一下當前的readIndex的位置
final int dataLength = in.readInt(); // 讀取傳送過來的消息的長度。ByteBuf 的readInt()方法會讓他的readIndex增加4
if (dataLength < 0)
{
// 我們讀到的消息體長度為0,這是不應該出現的情況,這里出現這情況,關閉連接。
ctx.close();
}
readable = in.readableBytes();
if (readable < dataLength)
{
// 讀到的消息體長度如果小於我們傳送過來的消息長度,則resetReaderIndex. 這個配合markReaderIndex使用的。把readIndex重置到mark的地方
in.resetReaderIndex();
return;
}
final byte[] body = new byte[dataLength];
in.readBytes(body);
// 判斷是否讀取到內容
final int length = body.length;
if (length == 0)
{
return;// 若未讀出任何內容,則忽略
}
out.add(this.byte2req(body));
}
/**
* 將讀取到的byte數據轉換為請求對象
* @param body
* @return
* @throws Exception
*/
private CommonRequest byte2req(byte[] body) throws Exception
{
final CommonRequest req = new CommonRequest(Constants.INVALID_MSGID);
final int length = body.length;
// 若內容數組的長度小於或等於12,則表示消息主體內容為空,直接返回一個無效的消息出去
if (length < PayMessageDecoder.ID_STR_LENGTH)
{
PayMessageDecoder.DLOG
.info("The client sends the message length is: {}, is invalid message, directly returns a msgId = {} request entity",
length, Constants.INVALID_MSGID);
return req;
}
// 獲取消息ID
final byte[] mbs = new byte[PayMessageDecoder.SINGLE_ID_LENGTH];
System.array(body, 0, mbs, 0, PayMessageDecoder.SINGLE_ID_LENGTH);
final int msgId = ByteUtil.byte4toint(mbs);
req.setMsgId(msgId);
// 獲取sequenceId
final byte[] sbs = new byte[PayMessageDecoder.SINGLE_ID_LENGTH];
System.array(body, 4, sbs, 0, PayMessageDecoder.SINGLE_ID_LENGTH);
final int sequenceId = ByteUtil.byte4toint(sbs);
req.setSequenceId(sequenceId);
// 獲取entityId
final byte[] ebs = new byte[PayMessageDecoder.SINGLE_ID_LENGTH];
System.array(body, 8, ebs, 0, PayMessageDecoder.SINGLE_ID_LENGTH);
final int entityId = ByteUtil.byte4toint(ebs);
req.setEntityId(entityId);
// 獲取消息主體內容
if (length > PayMessageDecoder.ID_STR_LENGTH)
{
final int contentLen = length - PayMessageDecoder.ID_STR_LENGTH;
final byte[] contentbs = new byte[contentLen];
System.array(body, 12, contentbs, 0, contentLen);
final String content = new String(contentbs, CharsetUtil.UTF_8);
req.setMsgContent(content);
}
return req;
}
}
PayServerHandler.java
package com.miri.pay.scoket;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;
import java.util.HashMap;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.miri.pay.MessageQueue;
import com.miri.pay.model.CommonRequest;
import com.miri.pay.model.PendingBean;
/**
* Socket服務端處理器
*/
public class PayServerHandler extends ChannelInboundHandlerAdapter
{
private static final Logger DLOG = LoggerFactory.getLogger(PayServerHandler.class);
/**
* 外部訂單號-頻道
*/
public static final Map<String, Channel> CHANNELS = new HashMap<String, Channel>();
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
{
try
{
PayServerHandler.DLOG.info("Client send to msg is: {}", msg);
final CommonRequest request = (CommonRequest) msg;
final PendingBean bean = new PendingBean(ctx.channel(), request);
MessageQueue.offer(bean);
}
finally
{
ReferenceCountUtil.release(msg);
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception
{
ctx.flush();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception
{
super.channelActive(ctx);
final Channel channel = ctx.channel();
PayServerHandler.DLOG.info("Client active form {}", channel.remoteAddress());
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception
{
super.channelInactive(ctx);
final Channel channel = ctx.channel();
PayServerHandler.DLOG.info("Client inactive form {}", channel.remoteAddress());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception
{
PayServerHandler.DLOG.error("System exception", cause);
ctx.close();
}
}