① 即时通讯软件开发的网络编程方式有哪些
引言、即时通讯是网上最为流行的通讯方式,市场上也出现了各种各样的即时通讯软件。这篇文章将会给大家介绍一些开发即时通讯软件的网络编程方式。
开发即时通讯软件需要用到安卓端技术java语言,苹果端oc语言,电脑端win系统桌面C/C++语言,管理后台数据库语言,后台管理界面java或者php。建议可以使用第三方SDK,可以有效地避免消息漏发,卡顿,数据并发等很多问题,提高了用户对产品的体验感。
三、如何设置编程。
mysql数据库的用户名为root,密码为空,可以根据自己的需要设置相应的用户名和密码(固定在程序中)。mychatserver是聊天服务器,myfileserver是文件服务器,文件服务器负责上传和下载聊天中发送的文件,myimgserver负责上传和下载聊天中的图片。三个服务之间相互独立,不会互相影响。聊天服务器监听端口是20000,文件服务器端口是20001,图片服务器端口号是20002,这三个端口的客户端连接,其中聊天端口和客户端是长连接,文件端口和图片可选择长连接或短连接。第一次运行mychatserver时,如果能顺利连上mysql,mychatserver会自动检测是否存在名为myim的数据库,如果不存在就可以创建了,并新建三张信息表,分别是用户信息表,好友关系表和聊天消息记录表。第一次启动文件服务器时会创建filecache目录,这个目录用来存储聊天中的聊天图片和离线文件以及客户端的升级包。为了方便查看代码,可以用Visual Studio管理代码,使用VS打开myserver.sln查看和管理代码。
② 如何看懂《linux多线程服务端编程
一:进程和线程
每个进程有自己独立的地址空间。“在同一个进程”还是“不在同一个进程”是系统功能划分的重要决策点。《Erlang程序设计》[ERL]把进程比喻为人:
每个人有自己的记忆(内存),人与人通过谈话(消息传递)来交流,谈话既可以是面谈(同一台服务器),也可以在电话里谈(不同的服务器,有网络通信)。面谈和电话谈的区别在于,面谈可以立即知道对方是否死了(crash,SIGCHLD),而电话谈只能通过周期性的心跳来判断对方是否还活着。
有了这些比喻,设计分布式系统时可以采取“角色扮演”,团队里的几个人各自扮演一个进程,人的角色由进程的代码决定(管登录的、管消息分发的、管买卖的等等)。每个人有自己的记忆,但不知道别人的记忆,要想知道别人的看法,只能通过交谈(暂不考虑共享内存这种IPC)。然后就可以思考:
·容错:万一有人突然死了
·扩容:新人中途加进来
·负载均衡:把甲的活儿挪给乙做
·退休:甲要修复bug,先别派新任务,等他做完手上的事情就把他重启
等等各种场景,十分便利。
线程的特点是共享地址空间,从而可以高效地共享数据。一台机器上的多个进程能高效地共享代码段(操作系统可以映射为同样的物理内存),但不能共享数据。如果多个进程大量共享内存,等于是把多进程程序当成多线程来写,掩耳盗铃。
“多线程”的价值,我认为是为了更好地发挥多核处理器(multi-cores)的效能。在单核时代,多线程没有多大价值(个人想法:如果要完成的任务是CPU密集型的,那多线程没有优势,甚至因为线程切换的开销,多线程反而更慢;如果要完成的任务既有CPU计算,又有磁盘或网络IO,则使用多线程的好处是,当某个线程因为IO而阻塞时,OS可以调度其他线程执行,虽然效率确实要比任务的顺序执行效率要高,然而,这种类型的任务,可以通过单线程的”non-blocking IO+IO multiplexing”的模型(事件驱动)来提高效率,采用多线程的方式,带来的可能仅仅是编程上的简单而已)。Alan Cox说过:”A computer is a state machine.Threads are for people who can’t program state machines.”(计算机是一台状态机。线程是给那些不能编写状态机程序的人准备的)如果只有一块CPU、一个执行单元,那么确实如Alan Cox所说,按状态机的思路去写程序是最高效的。
二:单线程服务器的常用编程模型
据我了解,在高性能的网络程序中,使用得最为广泛的恐怕要数”non-blocking IO + IO multiplexing”这种模型,即Reactor模式。
在”non-blocking IO + IO multiplexing”这种模型中,程序的基本结构是一个事件循环(event loop),以事件驱动(event-driven)和事件回调的方式实现业务逻辑:
[cpp] view plain
//代码仅为示意,没有完整考虑各种情况
while(!done)
{
int timeout_ms = max(1000, getNextTimedCallback());
int retval = poll(fds, nfds, timeout_ms);
if (retval<0){
处理错误,回调用户的error handler
}else{
处理到期的timers,回调用户的timer handler
if(retval>0){
处理IO事件,回调用户的IO event handler
}
}
}
这里select(2)/poll(2)有伸缩性方面的不足(描述符过多时,效率较低),Linux下可替换为epoll(4),其他操作系统也有对应的高性能替代品。
Reactor模型的优点很明显,编程不难,效率也不错。不仅可以用于读写socket,连接的建立(connect(2)/accept(2)),甚至DNS解析都可以用非阻塞方式进行,以提高并发度和吞吐量(throughput),对于IO密集的应用是个不错的选择。lighttpd就是这样,它内部的fdevent结构十分精妙,值得学习。
基于事件驱动的编程模型也有其本质的缺点,它要求事件回调函数必须是非阻塞的。对于涉及网络IO的请求响应式协议,它容易割裂业务逻辑,使其散布于多个回调函数之中,相对不容易理解和维护。
三:多线程服务器的常用编程模型
大概有这么几种:
a:每个请求创建一个线程,使用阻塞式IO操作。在Java 1.4引人NIO之前,这是Java网络编程的推荐做法。可惜伸缩性不佳(请求太多时,操作系统创建不了这许多线程)。
b:使用线程池,同样使用阻塞式IO操作。与第1种相比,这是提高性能的措施。
c:使用non-blocking IO + IO multiplexing。即Java NIO的方式。
d:Leader/Follower等高级模式。
在默认情况下,我会使用第3种,即non-blocking IO + one loop per thread模式来编写多线程C++网络服务程序。
1:one loop per thread
此种模型下,程序里的每个IO线程有一个event loop,用于处理读写和定时事件(无论周期性的还是单次的)。代码框架跟“单线程服务器的常用编程模型”一节中的一样。
libev的作者说:
One loop per thread is usually a good model. Doing this is almost never wrong, some times a better-performance model exists, but it is always a good start.
这种方式的好处是:
a:线程数目基本固定,可以在程序启动的时候设置,不会频繁创建与销毁。
b:可以很方便地在线程间调配负载。
c:IO事件发生的线程是固定的,同一个TCP连接不必考虑事件并发。
Event loop代表了线程的主循环,需要让哪个线程干活,就把timer或IO channel(如TCP连接)注册到哪个线程的loop里即可:对实时性有要求的connection可以单独用一个线程;数据量大的connection可以独占一个线程,并把数据处理任务分摊到另几个计算线程中(用线程池);其他次要的辅助性connections可以共享一个线程。
比如,在dbproxy中,一个线程用于专门处理客户端发来的管理命令;一个线程用于处理客户端发来的MySQL命令,而与后端数据库通信执行该命令时,是将该任务分配给所有事件线程处理的。
对于non-trivial(有一定规模)的服务端程序,一般会采用non-blocking IO + IO multiplexing,每个connection/acceptor都会注册到某个event loop上,程序里有多个event loop,每个线程至多有一个event loop。
多线程程序对event loop提出了更高的要求,那就是“线程安全”。要允许一个线程往别的线程的loop里塞东西,这个loop必须得是线程安全的。
在dbproxy中,线程向其他线程分发任务,是通过管道和队列实现的。比如主线程accept到连接后,将表示该连接的结构放入队列,并向管道中写入一个字节。计算线程在自己的event loop中注册管道的读事件,一旦有数据可读,就尝试从队列中取任务。
2:线程池
不过,对于没有IO而光有计算任务的线程,使用event loop有点浪费。可以使用一种补充方案,即用blocking queue实现的任务队列:
[cpp] view plain
typedef boost::function<void()>Functor;
BlockingQueue<Functor> taskQueue; //线程安全的全局阻塞队列
//计算线程
void workerThread()
{
while (running) //running变量是个全局标志
{
Functor task = taskQueue.take(); //this blocks
task(); //在产品代码中需要考虑异常处理
}
}
// 创建容量(并发数)为N的线程池
int N = num_of_computing_threads;
for (int i = 0; i < N; ++i)
{
create_thread(&workerThread); //启动线程
}
//向任务队列中追加任务
Foo foo; //Foo有calc()成员函数
boost::function<void()> task = boost::bind(&Foo::calc,&foo);
taskQueue.post(task);
除了任务队列,还可以用BlockingQueue<T>实现数据的生产者消费者队列,即T是数据类型而非函数对象,queue的消费者从中拿到数据进行处理。其实本质上是一样的。
3:总结
总结而言,我推荐的C++多线程服务端编程模式为:one (event) loop per thread + thread pool:
event loop用作IO multiplexing,配合non-blockingIO和定时器;
thread pool用来做计算,具体可以是任务队列或生产者消费者队列。
以这种方式写服务器程序,需要一个优质的基于Reactor模式的网络库来支撑,muo正是这样的网络库。比如dbproxy使用的是libevent。
程序里具体用几个loop、线程池的大小等参数需要根据应用来设定,基本的原则是“阻抗匹配”(解释见下),使得CPU和IO都能高效地运作。所谓阻抗匹配原则:
如果池中线程在执行任务时,密集计算所占的时间比重为 P (0 < P <= 1),而系统一共有 C 个 CPU,为了让这 C 个 CPU 跑满而又不过载,线程池大小的经验公式 T = C/P。(T 是个 hint,考虑到 P 值的估计不是很准确,T 的最佳值可以上下浮动 50%)
以后我再讲这个经验公式是怎么来的,先验证边界条件的正确性。
假设 C = 8,P = 1.0,线程池的任务完全是密集计算,那么T = 8。只要 8 个活动线程就能让 8 个 CPU 饱和,再多也没用,因为 CPU 资源已经耗光了。
假设 C = 8,P = 0.5,线程池的任务有一半是计算,有一半等在 IO 上,那么T = 16。考虑操作系统能灵活合理地调度 sleeping/writing/running 线程,那么大概 16 个“50%繁忙的线程”能让 8 个 CPU 忙个不停。启动更多的线程并不能提高吞吐量,反而因为增加上下文切换的开销而降低性能。
如果 P < 0.2,这个公式就不适用了,T 可以取一个固定值,比如 5*C。
另外,公式里的 C 不一定是 CPU 总数,可以是“分配给这项任务的 CPU 数目”,比如在 8 核机器上分出 4 个核来做一项任务,那么 C=4。
四:进程间通信只用TCP
Linux下进程间通信的方式有:匿名管道(pipe)、具名管道(FIFO)、POSIX消息队列、共享内存、信号(signals),以及Socket。同步原语有互斥器(mutex)、条件变量(condition variable)、读写锁(reader-writer lock)、文件锁(record locking)、信号量(semaphore)等等。
进程间通信我首选Sockets(主要指TCP,我没有用过UDP,也不考虑Unix domain协议)。其好处在于:
可以跨主机,具有伸缩性。反正都是多进程了,如果一台机器的处理能力不够,很自然地就能用多台机器来处理。把进程分散到同一局域网的多台机器上,程序改改host:port配置就能继续用;
TCP sockets和pipe都是操作文件描述符,用来收发字节流,都可以read/write/fcntl/select/poll等。不同的是,TCP是双向的,Linux的pipe是单向的,进程间双向通信还得开两个文件描述符,不方便;而且进程要有父子关系才能用pipe,这些都限制了pipe的使用;
TCP port由一个进程独占,且进程退出时操作系统会自动回收文件描述符。因此即使程序意外退出,也不会给系统留下垃圾,程序重启之后能比较容易地恢复,而不需要重启操作系统(用跨进程的mutex就有这个风险);而且,port是独占的,可以防止程序重复启动,后面那个进程抢不到port,自然就没法初始化了,避免造成意料之外的结果;
与其他IPC相比,TCP协议的一个天生的好处是“可记录、可重现”。tcpmp和Wireshark是解决两个进程间协议和状态争端的好帮手,也是性能(吞吐量、延迟)分析的利器。我们可以借此编写分布式程序的自动化回归测试。也可以用tcp之类的工具进行压力测试。TCP还能跨语言,服务端和客户端不必使用同一种语言。
分布式系统的软件设计和功能划分一般应该以“进程”为单位。从宏观上看,一个分布式系统是由运行在多台机器上的多个进程组成的,进程之间采用TCP长连接通信。
使用TCP长连接的好处有两点:一是容易定位分布式系统中的服务之间的依赖关系。只要在机器上运行netstat -tpna|grep <port>就能立刻列出用到某服务的客户端地址(Foreign Address列),然后在客户端的机器上用netstat或lsof命令找出是哪个进程发起的连接。TCP短连接和UDP则不具备这一特性。二是通过接收和发送队列的长度也较容易定位网络或程序故障。在正常运行的时候,netstat打印的Recv-Q和Send-Q都应该接近0,或者在0附近摆动。如果Recv-Q保持不变或持续增加,则通常意味着服务进程的处理速度变慢,可能发生了死锁或阻塞。如果Send-Q保持不变或持续增加,有可能是对方服务器太忙、来不及处理,也有可能是网络中间某个路由器或交换机故障造成丢包,甚至对方服务器掉线,这些因素都可能表现为数据发送不出去。通过持续监控Recv-Q和Send-Q就能及早预警性能或可用性故障。以下是服务端线程阻塞造成Recv-Q和客户端Send-Q激增的例子:
[cpp] view plain
$netstat -tn
Proto Recv-Q Send-Q Local Address Foreign
tcp 78393 0 10.0.0.10:2000 10.0.0.10:39748 #服务端连接
tcp 0 132608 10.0.0.10:39748 10.0.0.10:2000 #客户端连接
tcp 0 52 10.0.0.10:22 10.0.0.4:55572
五:多线程服务器的适用场合
如果要在一台多核机器上提供一种服务或执行一个任务,可用的模式有:
a:运行一个单线程的进程;
b:运行一个多线程的进程;
c:运行多个单线程的进程;
d:运行多个多线程的进程;
考虑这样的场景:如果使用速率为50MB/s的数据压缩库,进程创建销毁的开销是800微秒,线程创建销毁的开销是50微秒。如何执行压缩任务?
如果要偶尔压缩1GB的文本文件,预计运行时间是20s,那么起一个进程去做是合理的,因为进程启动和销毁的开销远远小于实际任务的耗时。
如果要经常压缩500kB的文本数据,预计运行时间是10ms,那么每次都起进程 似乎有点浪费了,可以每次单独起一个线程去做。
如果要频繁压缩10kB的文本数据,预计运行时间是200微秒,那么每次起线程似 乎也很浪费,不如直接在当前线程搞定。也可以用一个线程池,每次把压缩任务交给线程池,避免阻塞当前线程(特别要避免阻塞IO线程)。
由此可见,多线程并不是万灵丹(silver bullet)。
1:必须使用单线程的场合
据我所知,有两种场合必须使用单线程:
a:程序可能会fork(2);
实际编程中,应该保证只有单线程程序能进行fork(2)。多线程程序不是不能调用fork(2),而是这么做会遇到很多麻烦:
fork一般不能在多线程程序中调用,因为Linux的fork只克隆当前线程的thread of control,不可隆其他线程。fork之后,除了当前线程之外,其他线程都消失了。
这就造成一种危险的局面。其他线程可能正好处于临界区之内,持有了某个锁,而它突然死亡,再也没有机会去解锁了。此时如果子进程试图再对同一个mutex加锁,就会立即死锁。因此,fork之后,子进程就相当于处于signal handler之中(因为不知道调用fork时,父进程中的线程此时正在调用什么函数,这和信号发生时的场景一样),你不能调用线程安全的函数(除非它是可重入的),而只能调用异步信号安全的函数。比如,fork之后,子进程不能调用:
malloc,因为malloc在访问全局状态时几乎肯定会加锁;
任何可能分配或释放内存的函数,比如snprintf;
任何Pthreads函数;
printf系列函数,因为其他线程可能恰好持有stdout/stderr的锁;
除了man 7 signal中明确列出的信号安全函数之外的任何函数。
因此,多线程中调用fork,唯一安全的做法是fork之后,立即调用exec执行另一个程序,彻底隔断子进程与父进程的联系。
在多线程环境中调用fork,产生子进程后。子进程内部只存在一个线程,也就是父进程中调用fork的线程的副本。
使用fork创建子进程时,子进程通过继承整个地址空间的副本,也从父进程那里继承了所有互斥量、读写锁和条件变量的状态。如果父进程中的某个线程占有锁,则子进程同样占有这些锁。问题是子进程并不包含占有锁的线程的副本,所以子进程没有办法知道它占有了哪些锁,并且需要释放哪些锁。
尽管Pthread提供了pthread_atfork函数试图绕过这样的问题,但是这回使得代码变得混乱。因此《Programming With Posix Threads》一书的作者说:”Avoid using fork in threaded code except where the child process will immediately exec a new program.”。
b:限制程序的CPU占用率;
这个很容易理解,比如在一个8核的服务器上,一个单线程程序即便发生busy-wait,占满1个core,其CPU使用率也只有12.5%,在这种最坏的情况下,系统还是有87.5%的计算资源可供其他服务进程使用。
因此对于一些辅助性的程序,如果它必须和主要服务进程运行在同一台机器的话,那么做成单线程的能避免过分抢夺系统的计算资源。
③ java Netty NIO 如何突破 65536 个端口的限制如何做到 10万~50万 长连接
首先说一下 服务器是只监听一个端口,所有的客户端连接,都是连接到服务器的同一个端口上的。也就是说服务器只是用了一个端口。就比如Http服务器。默认只用了80端口。
这是解答一些人的这个疑惑。
下面来回答你的问题
nio 在linux上使用的是epoll ,epoll支持在一个进程中打开的FD是操作系统最大文件句柄数,而不是你所说的16位short表示的文件句柄。 而 select模型 单进程打开的FD是受限的 select模型默认FD是1024 。操作系统最大文件句柄数跟内存有关,1GB内存的机器上,大概是10万个句柄左右。可以通过cat /proc/sys/fs/file-max 查看
这个可以在Netty权威指南第二版的第七页看到。
我ubuntu虚拟机,2G内存。结果是 200496
2019/05/09 修正一下上面让人误会的地方
“服务器是只监听一个端口” 这句话 请参照这一段的最后一行 “就比如Http服务器默认只用了80端口” 我这一段话里说的服务器并不是指服务器主机 硬件, 而是说 服务程序。 一个服务器主机操作系统上 可以运行很多服务程序, 而通常都会说 Netty服务器、Apache服务器、tomcat服务器、Mysql服务器 , 这里是指 Netty服务端 Apache服务端 tomcat服务端 Mysql服务端 。 再比如 一个游戏的登录服务器 没人会叫他 游戏Netty服务程序 或者Netty登录服务程序 , 而会称呼它是 Netty服务器或者登录服务器 或者xxx游戏登录服务器之类的。 只是依照行业术语来说的 被误会了很抱歉 这里解释一下 。
再次回答一下这个问题 Netty NIO不用突破65536个端口限制 因为根本没有这个端口限制问题 只有主动发起一个请求 才会占用一个本地端口 主动发起10个请求 会占用10个本地端口 我这里说的是长连接 Netty NIO是属于服务程序 他只需要监听一个端口 比如8000端口 这时候有10个客户端 连接到这个Netty服务器 都是10个客户端全都连接到服务器的8000端口 服务端只会占用8000端口这一个端口 所以不需要突破65536端口限制
④ socket高并发网络编程服务端有什么框架
netty;
PayServer.java
package com.miri.pay.scoket;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class PayServer implements Runnable
{
private static final Logger DLOG = LoggerFactory.getLogger(PayServer.class);
private final int port;
public PayServer(int port)
{
this.port = port;
}
/**
* 为ServerBootstrap绑定指定端口
*/
public void run()
{
// 用于接收发来的连接请求
final EventLoopGroup bossGroup = new NioEventLoopGroup();
// 用于处理boss接受并且注册给worker的连接中的信息
final EventLoopGroup workerGroup = new NioEventLoopGroup();
try
{
// 配置服务器
final ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup);
bootstrap.channel(NioServerSocketChannel.class);
bootstrap.option(ChannelOption.SO_BACKLOG, 128);
// 通过NoDelay禁用Nagle,使消息立即发出去,不用等待到一定的数据量才发出去
bootstrap.option(ChannelOption.TCP_NODELAY, true);
// 保持长连接状态
bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
// CustomChannelInitializer是一个特殊的handler,用于方便的配置用户自定义的handler实现
bootstrap.childHandler(new CustomChannelInitializer());
// 绑定并开始接受传入的连接
final ChannelFuture future = bootstrap.bind(this.port).sync();
if (future.isSuccess())
{
PayServer.DLOG.info("Start the socket server {} success", this.port);
}
else
{
PayServer.DLOG.info("Start the socket server {} failure,System exit!", this.port);
throw new RuntimeException("Socket服务端启动失败");
}
// 等待服务器套接字关闭
// 关闭服务器
future.channel().closeFuture().sync();
}
catch (final InterruptedException e)
{
PayServer.DLOG.error("Close the socket server exception occurs,System exit!", e);
throw new RuntimeException("关闭Socket服务端失败");
}
finally
{
// 关闭所有事件循环终止线程
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
/**
* 特殊的内部类
* <p>
* 是一个特殊的handler,用于方便的配置用户自定义的handler实现
* @author xulonghui
*/
static class CustomChannelInitializer extends ChannelInitializer<SocketChannel>
{
@Override
protected void initChannel(SocketChannel ch) throws Exception
{
final ChannelPipeline p = ch.pipeline();
p.addLast(new PayMessageEncoder());
p.addLast(new PayMessageDecoder());
p.addLast(new PayServerHandler());
}
}
}
PayMessageEncoder.java
package com.miri.pay.scoket;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import io.netty.util.CharsetUtil;
import com.miri.pay.model.CommonResponse;
import com.miri.pay.utils.JsonUtil;
/**
*消息编码器
* <p>
* 编码从服务端发送出的消息
*/
public class PayMessageEncoder extends MessageToByteEncoder<CommonResponse>
{
@Override
protected void encode(ChannelHandlerContext ctx, CommonResponse rsp, ByteBuf out) throws Exception
{
if (rsp != null)
{
final Object msgContent = rsp.getMsgContent();
// 消息ID,sequenceId和entityId三个加起来是12个长度
int msgLen = 12;
byte[] contentbs = new byte[] {};
if (msgContent != null)
{
final String content = JsonUtil.bean2json(msgContent);
contentbs = content.getBytes(CharsetUtil.UTF_8);
final int cl = contentbs.length;
msgLen += cl;
}
out.writeInt(msgLen);// 写入当前消息的总长度
out.writeInt(rsp.getMsgId());// 写入当前消息的消息ID
out.writeInt(rsp.getSequenceId());// 写入当前消息的SequenceId
out.writeInt(rsp.getEntityId());// 写入当前消息的EntityId
// 写入消息主体内容
if (contentbs.length > 0)
{
out.writeBytes(contentbs);
}
}
}
}
PayMessageDecoder.java
package com.miri.pay.scoket;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.util.CharsetUtil;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.miri.pay.constants.Constants;
import com.miri.pay.model.CommonRequest;
import com.miri.pay.utils.ByteUtil;
/**
* 消息解码器
* <p>
* 解码从客户端请求的消息
*/
public class PayMessageDecoder extends ByteToMessageDecoder
{
private static final Logger DLOG = LoggerFactory.getLogger(PayMessageDecoder.class);
/**
* 表示头长度的字节数
*/
private static final int HEAD_LENGTH = 4;
/**
* 所有ID串所属的字节数
*/
private static final int ID_STR_LENGTH = 12;
/**
* 单个ID所属的字节数
*/
private static final int SINGLE_ID_LENGTH = 4;
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception
{
int readable = in.readableBytes();
if (readable < PayMessageDecoder.HEAD_LENGTH)
{
return;
}
in.markReaderIndex(); // 我们标记一下当前的readIndex的位置
final int dataLength = in.readInt(); // 读取传送过来的消息的长度。ByteBuf 的readInt()方法会让他的readIndex增加4
if (dataLength < 0)
{
// 我们读到的消息体长度为0,这是不应该出现的情况,这里出现这情况,关闭连接。
ctx.close();
}
readable = in.readableBytes();
if (readable < dataLength)
{
// 读到的消息体长度如果小于我们传送过来的消息长度,则resetReaderIndex. 这个配合markReaderIndex使用的。把readIndex重置到mark的地方
in.resetReaderIndex();
return;
}
final byte[] body = new byte[dataLength];
in.readBytes(body);
// 判断是否读取到内容
final int length = body.length;
if (length == 0)
{
return;// 若未读出任何内容,则忽略
}
out.add(this.byte2req(body));
}
/**
* 将读取到的byte数据转换为请求对象
* @param body
* @return
* @throws Exception
*/
private CommonRequest byte2req(byte[] body) throws Exception
{
final CommonRequest req = new CommonRequest(Constants.INVALID_MSGID);
final int length = body.length;
// 若内容数组的长度小于或等于12,则表示消息主体内容为空,直接返回一个无效的消息出去
if (length < PayMessageDecoder.ID_STR_LENGTH)
{
PayMessageDecoder.DLOG
.info("The client sends the message length is: {}, is invalid message, directly returns a msgId = {} request entity",
length, Constants.INVALID_MSGID);
return req;
}
// 获取消息ID
final byte[] mbs = new byte[PayMessageDecoder.SINGLE_ID_LENGTH];
System.array(body, 0, mbs, 0, PayMessageDecoder.SINGLE_ID_LENGTH);
final int msgId = ByteUtil.byte4toint(mbs);
req.setMsgId(msgId);
// 获取sequenceId
final byte[] sbs = new byte[PayMessageDecoder.SINGLE_ID_LENGTH];
System.array(body, 4, sbs, 0, PayMessageDecoder.SINGLE_ID_LENGTH);
final int sequenceId = ByteUtil.byte4toint(sbs);
req.setSequenceId(sequenceId);
// 获取entityId
final byte[] ebs = new byte[PayMessageDecoder.SINGLE_ID_LENGTH];
System.array(body, 8, ebs, 0, PayMessageDecoder.SINGLE_ID_LENGTH);
final int entityId = ByteUtil.byte4toint(ebs);
req.setEntityId(entityId);
// 获取消息主体内容
if (length > PayMessageDecoder.ID_STR_LENGTH)
{
final int contentLen = length - PayMessageDecoder.ID_STR_LENGTH;
final byte[] contentbs = new byte[contentLen];
System.array(body, 12, contentbs, 0, contentLen);
final String content = new String(contentbs, CharsetUtil.UTF_8);
req.setMsgContent(content);
}
return req;
}
}
PayServerHandler.java
package com.miri.pay.scoket;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;
import java.util.HashMap;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.miri.pay.MessageQueue;
import com.miri.pay.model.CommonRequest;
import com.miri.pay.model.PendingBean;
/**
* Socket服务端处理器
*/
public class PayServerHandler extends ChannelInboundHandlerAdapter
{
private static final Logger DLOG = LoggerFactory.getLogger(PayServerHandler.class);
/**
* 外部订单号-频道
*/
public static final Map<String, Channel> CHANNELS = new HashMap<String, Channel>();
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
{
try
{
PayServerHandler.DLOG.info("Client send to msg is: {}", msg);
final CommonRequest request = (CommonRequest) msg;
final PendingBean bean = new PendingBean(ctx.channel(), request);
MessageQueue.offer(bean);
}
finally
{
ReferenceCountUtil.release(msg);
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception
{
ctx.flush();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception
{
super.channelActive(ctx);
final Channel channel = ctx.channel();
PayServerHandler.DLOG.info("Client active form {}", channel.remoteAddress());
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception
{
super.channelInactive(ctx);
final Channel channel = ctx.channel();
PayServerHandler.DLOG.info("Client inactive form {}", channel.remoteAddress());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception
{
PayServerHandler.DLOG.error("System exception", cause);
ctx.close();
}
}