Ⅰ socket高并发网络编程服务端有什么框架
netty;
PayServer.java
package com.miri.pay.scoket;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class PayServer implements Runnable
{
private static final Logger DLOG = LoggerFactory.getLogger(PayServer.class);
private final int port;
public PayServer(int port)
{
this.port = port;
}
/**
* 为ServerBootstrap绑定指定端口
*/
public void run()
{
// 用于接收发来的连接请求
final EventLoopGroup bossGroup = new NioEventLoopGroup();
// 用于处理boss接受并且注册给worker的连接中的信息
final EventLoopGroup workerGroup = new NioEventLoopGroup();
try
{
// 配置服务器
final ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup);
bootstrap.channel(NioServerSocketChannel.class);
bootstrap.option(ChannelOption.SO_BACKLOG, 128);
// 通过NoDelay禁用Nagle,使消息立即发出去,不用等待到一定的数据量才发出去
bootstrap.option(ChannelOption.TCP_NODELAY, true);
// 保持长连接状态
bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
// CustomChannelInitializer是一个特殊的handler,用于方便的配置用户自定义的handler实现
bootstrap.childHandler(new CustomChannelInitializer());
// 绑定并开始接受传入的连接
final ChannelFuture future = bootstrap.bind(this.port).sync();
if (future.isSuccess())
{
PayServer.DLOG.info("Start the socket server {} success", this.port);
}
else
{
PayServer.DLOG.info("Start the socket server {} failure,System exit!", this.port);
throw new RuntimeException("Socket服务端启动失败");
}
// 等待服务器套接字关闭
// 关闭服务器
future.channel().closeFuture().sync();
}
catch (final InterruptedException e)
{
PayServer.DLOG.error("Close the socket server exception occurs,System exit!", e);
throw new RuntimeException("关闭Socket服务端失败");
}
finally
{
// 关闭所有事件循环终止线程
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
/**
* 特殊的内部类
* <p>
* 是一个特殊的handler,用于方便的配置用户自定义的handler实现
* @author xulonghui
*/
static class CustomChannelInitializer extends ChannelInitializer<SocketChannel>
{
@Override
protected void initChannel(SocketChannel ch) throws Exception
{
final ChannelPipeline p = ch.pipeline();
p.addLast(new PayMessageEncoder());
p.addLast(new PayMessageDecoder());
p.addLast(new PayServerHandler());
}
}
}
PayMessageEncoder.java
package com.miri.pay.scoket;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import io.netty.util.CharsetUtil;
import com.miri.pay.model.CommonResponse;
import com.miri.pay.utils.JsonUtil;
/**
*消息编码器
* <p>
* 编码从服务端发送出的消息
*/
public class PayMessageEncoder extends MessageToByteEncoder<CommonResponse>
{
@Override
protected void encode(ChannelHandlerContext ctx, CommonResponse rsp, ByteBuf out) throws Exception
{
if (rsp != null)
{
final Object msgContent = rsp.getMsgContent();
// 消息ID,sequenceId和entityId三个加起来是12个长度
int msgLen = 12;
byte[] contentbs = new byte[] {};
if (msgContent != null)
{
final String content = JsonUtil.bean2json(msgContent);
contentbs = content.getBytes(CharsetUtil.UTF_8);
final int cl = contentbs.length;
msgLen += cl;
}
out.writeInt(msgLen);// 写入当前消息的总长度
out.writeInt(rsp.getMsgId());// 写入当前消息的消息ID
out.writeInt(rsp.getSequenceId());// 写入当前消息的SequenceId
out.writeInt(rsp.getEntityId());// 写入当前消息的EntityId
// 写入消息主体内容
if (contentbs.length > 0)
{
out.writeBytes(contentbs);
}
}
}
}
PayMessageDecoder.java
package com.miri.pay.scoket;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.util.CharsetUtil;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.miri.pay.constants.Constants;
import com.miri.pay.model.CommonRequest;
import com.miri.pay.utils.ByteUtil;
/**
* 消息解码器
* <p>
* 解码从客户端请求的消息
*/
public class PayMessageDecoder extends ByteToMessageDecoder
{
private static final Logger DLOG = LoggerFactory.getLogger(PayMessageDecoder.class);
/**
* 表示头长度的字节数
*/
private static final int HEAD_LENGTH = 4;
/**
* 所有ID串所属的字节数
*/
private static final int ID_STR_LENGTH = 12;
/**
* 单个ID所属的字节数
*/
private static final int SINGLE_ID_LENGTH = 4;
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception
{
int readable = in.readableBytes();
if (readable < PayMessageDecoder.HEAD_LENGTH)
{
return;
}
in.markReaderIndex(); // 我们标记一下当前的readIndex的位置
final int dataLength = in.readInt(); // 读取传送过来的消息的长度。ByteBuf 的readInt()方法会让他的readIndex增加4
if (dataLength < 0)
{
// 我们读到的消息体长度为0,这是不应该出现的情况,这里出现这情况,关闭连接。
ctx.close();
}
readable = in.readableBytes();
if (readable < dataLength)
{
// 读到的消息体长度如果小于我们传送过来的消息长度,则resetReaderIndex. 这个配合markReaderIndex使用的。把readIndex重置到mark的地方
in.resetReaderIndex();
return;
}
final byte[] body = new byte[dataLength];
in.readBytes(body);
// 判断是否读取到内容
final int length = body.length;
if (length == 0)
{
return;// 若未读出任何内容,则忽略
}
out.add(this.byte2req(body));
}
/**
* 将读取到的byte数据转换为请求对象
* @param body
* @return
* @throws Exception
*/
private CommonRequest byte2req(byte[] body) throws Exception
{
final CommonRequest req = new CommonRequest(Constants.INVALID_MSGID);
final int length = body.length;
// 若内容数组的长度小于或等于12,则表示消息主体内容为空,直接返回一个无效的消息出去
if (length < PayMessageDecoder.ID_STR_LENGTH)
{
PayMessageDecoder.DLOG
.info("The client sends the message length is: {}, is invalid message, directly returns a msgId = {} request entity",
length, Constants.INVALID_MSGID);
return req;
}
// 获取消息ID
final byte[] mbs = new byte[PayMessageDecoder.SINGLE_ID_LENGTH];
System.array(body, 0, mbs, 0, PayMessageDecoder.SINGLE_ID_LENGTH);
final int msgId = ByteUtil.byte4toint(mbs);
req.setMsgId(msgId);
// 获取sequenceId
final byte[] sbs = new byte[PayMessageDecoder.SINGLE_ID_LENGTH];
System.array(body, 4, sbs, 0, PayMessageDecoder.SINGLE_ID_LENGTH);
final int sequenceId = ByteUtil.byte4toint(sbs);
req.setSequenceId(sequenceId);
// 获取entityId
final byte[] ebs = new byte[PayMessageDecoder.SINGLE_ID_LENGTH];
System.array(body, 8, ebs, 0, PayMessageDecoder.SINGLE_ID_LENGTH);
final int entityId = ByteUtil.byte4toint(ebs);
req.setEntityId(entityId);
// 获取消息主体内容
if (length > PayMessageDecoder.ID_STR_LENGTH)
{
final int contentLen = length - PayMessageDecoder.ID_STR_LENGTH;
final byte[] contentbs = new byte[contentLen];
System.array(body, 12, contentbs, 0, contentLen);
final String content = new String(contentbs, CharsetUtil.UTF_8);
req.setMsgContent(content);
}
return req;
}
}
PayServerHandler.java
package com.miri.pay.scoket;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;
import java.util.HashMap;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.miri.pay.MessageQueue;
import com.miri.pay.model.CommonRequest;
import com.miri.pay.model.PendingBean;
/**
* Socket服务端处理器
*/
public class PayServerHandler extends ChannelInboundHandlerAdapter
{
private static final Logger DLOG = LoggerFactory.getLogger(PayServerHandler.class);
/**
* 外部订单号-频道
*/
public static final Map<String, Channel> CHANNELS = new HashMap<String, Channel>();
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
{
try
{
PayServerHandler.DLOG.info("Client send to msg is: {}", msg);
final CommonRequest request = (CommonRequest) msg;
final PendingBean bean = new PendingBean(ctx.channel(), request);
MessageQueue.offer(bean);
}
finally
{
ReferenceCountUtil.release(msg);
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception
{
ctx.flush();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception
{
super.channelActive(ctx);
final Channel channel = ctx.channel();
PayServerHandler.DLOG.info("Client active form {}", channel.remoteAddress());
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception
{
super.channelInactive(ctx);
final Channel channel = ctx.channel();
PayServerHandler.DLOG.info("Client inactive form {}", channel.remoteAddress());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception
{
PayServerHandler.DLOG.error("System exception", cause);
ctx.close();
}
}
Ⅱ android上的socket通信的开源框架有哪些
Netty是由JBOSS提供的一个java开源框架。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。也就是说,Netty 是一个基于NIO的客户,服务器端编程框架,它在socket的基础上根据各种常用的应用协议又进一步封装,提供更便利的接口。如果需要快速搭建一个C/S服务框架,那Netty过来用是没错。 反过来你的情况是需要学习这个课程,你应该掌握基本的socket编程及其通信原理,所以学习时直接用socket编程比较好。也许哪一天,你灵感来了,编出一个比Netty更好的框架,一个更牛的软件。
Ⅲ android上的socket通信的开源框架有哪些
请去360手机助手下载android学习手册里面有例子、源码和文档
Apache MINA(Multipurpose Infrastructure for Network Applications) 是 Apache 组织一个较新的项目,它为开发高性能和高可用性的网络应用程序提供了非常便利的框架。当前发行的 MINA 版本支持基于 Java NIO 技术的 TCP/UDP 应用程序开发、串口通讯程序(只在最新的预览版中提供),MINA 所支持的功能也在进一步的扩展中。目前正在使用 MINA 的软件包括有:Apache Directory Project、AsyncWeb、AMQP(Advanced Message Queuing Protocol)、RED5 Server(Macromedia Flash Media RTMP)、ObjectRADIUS、Openfire 等等。
以上是从网上找到的mina框架简单介绍。
由于正在开发的项目中要求加入及时通信功能(游戏方面),所以在网上找了好几种框架,像openfire、tigase等都是基于Xmpp协议开发的优秀框架。但这些侧重于消息的推送,不适合游戏上的简单交互。所以后来找到了mina这个框架,顺手搭建起来。接下来就是这几天学习的总结了,文章里面没有涉及到逻辑层的方面,只是简单的实现即时通信功能。资源下载我会放在文章的最后面。
一、相关资源下载
(1)Apache官方网站:http://mina.apache.org/downloads.html
(2) Android用jar包(包括官网的资源,我会一律放在网络网盘下)
二、Mina简单配置
服务器端一共要用到四个jar包,包括一个日志包。将他们放在lib中,并加载进去
分别为mina-core-2.0.7.jar slf4j-log4j12-1.7.6.jar slf4j-api-1.7.6.jar log4j-1.2.14.jar(日志管理包)
如果要使用日志的jar包,则要在项目的src目录下新建一个log4j.properties,添加内容如下:
log4j.rootCategory=INFO, stdout , R
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[QC] %p [%t] %C.%M(%L) | %m%n
log4j.appender.R=org.apache.log4j.DailyRollingFileAppender
log4j.appender.R.File=D:\Tomcat 5.5\logs\qc.log
log4j.appender.R.layout=org.apache.log4j.PatternLayout
1log4j.appender.R.layout.ConversionPattern=%d-[TS] %p %t %c - %m%n
log4j.logger.com.neusoft=DEBUG
log4j.logger.com.opensymphony.oscache=ERROR
log4j.logger.net.sf.navigator=ERROR
log4j.logger.org.apache.commons=ERROR
log4j.logger.org.apache.struts=WARN
log4j.logger.org.displaytag=ERROR
log4j.logger.org.springframework=DEBUG
log4j.logger.com.ibatis.db=WARN
log4j.logger.org.apache.velocity=FATAL
log4j.logger.com.canoo.webtest=WARN
log4j.logger.org.hibernate.ps.PreparedStatementCache=WARN
log4j.logger.org.hibernate=DEBUG
log4j.logger.org.logicalcobwebs=WARN
log4j.rootCategory=INFO, stdout , R
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[QC] %p [%t] %C.%M(%L) | %m%n
log4j.appender.R=org.apache.log4j.DailyRollingFileAppender
log4j.appender.R.File=D:\Tomcat 5.5\logs\qc.log
log4j.appender.R.layout=org.apache.log4j.PatternLayout
1log4j.appender.R.layout.ConversionPattern=%d-[TS] %p %t %c - %m%n
log4j.logger.com.neusoft=DEBUG
log4j.logger.com.opensymphony.oscache=ERROR
log4j.logger.net.sf.navigator=ERROR
log4j.logger.org.apache.commons=ERROR
log4j.logger.org.apache.struts=WARN
log4j.logger.org.displaytag=ERROR
log4j.logger.org.springframework=DEBUG
log4j.logger.com.ibatis.db=WARN
log4j.logger.org.apache.velocity=FATAL
log4j.logger.com.canoo.webtest=WARN
log4j.logger.org.hibernate.ps.PreparedStatementCache=WARN
log4j.logger.org.hibernate=DEBUG
log4j.logger.org.logicalcobwebs=WARN
Android客户端要加入的jar包:mina-core-2.0.7.jar slf4j-android-1.6.1-RC1.jar两个jar包(可能直接使用上面的jar包也会行,我没试过~)
二、Mina服务端
我这边使用的是mina2.0版本,所以可能与mina1.0的版本有所不同。那么首先在服务器端创建开始
新建一个Demo1Server.class文件,里面包含着程序的入口,端口号,Acceptor连接.
1 public class Demo1Server {
2 //日志类的实现
3 private static Logger logger = Logger.getLogger(Demo1Server.class);
4 //端口号,要求客户端与服务器端一致
5 private static int PORT = 4444;
6
7 public static void main(String[] args){
8 IoAcceptor acceptor = null;
9 try{
10 //创建一个非阻塞的server端的Socket
11 acceptor = new NioSocketAcceptor();
12 //设置过滤器(使用mina提供的文本换行符编解码器)
13 acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"),LineDelimiter.WINDOWS.getValue(),LineDelimiter.WINDOWS.getValue())));
14 //自定义的编解码器
15 //acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(new CharsetCodecFactory()));
16 //设置读取数据的换从区大小
17 acceptor.getSessionConfig().setReadBufferSize(2048);
18 //读写通道10秒内无操作进入空闲状态
19 acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);
20 //为接收器设置管理服务
21 acceptor.setHandler(new Demo1ServerHandler());
22 //绑定端口
23 acceptor.bind(new InetSocketAddress(PORT));
24
25 logger.info("服务器启动成功... 端口号未:"+PORT);
26
27 }catch(Exception e){
28 logger.error("服务器启动异常...",e);
29 e.printStackTrace();
30 }
31 }
32
33 }
一个很简单的程序入口吧,简单的说就是在服务器上设置一个消息接收器,让它监听从端口传过来的消息并进行处理。那么接下来我们看看怎么进行消息处理。
新建一个消息处理类,或者说是是业务逻辑处理器——Demo1ServerHandler,它继承了IoHandlerAdapter类,它默认覆盖了七个方法,而我们主要使用messageReceived()。
public class Demo1ServerHandler extends IoHandlerAdapter {
public static Logger logger = Logger.getLogger(Demo1ServerHandler.class);
//从端口接受消息,会响应此方法来对消息进行处理
@Override
public void messageReceived(IoSession session, Object message)
throws Exception {
String msg = message.toString();
if("exit".equals(msg)){
//如果客户端发来exit,则关闭该连接
session.close(true);
}
//向客户端发送消息
Date date = new Date();
session.write(date);
logger.info("服务器接受消息成功...");
super.messageReceived(session, message);
}
//向客服端发送消息后会调用此方法
@Override
public void messageSent(IoSession session, Object message) throws Exception {
logger.info("服务器发送消息成功...");
super.messageSent(session, message);
}
//关闭与客户端的连接时会调用此方法
@Override
public void sessionClosed(IoSession session) throws Exception {
logger.info("服务器与客户端断开连接...");
super.sessionClosed(session);
}
//服务器与客户端创建连接
@Override
public void sessionCreated(IoSession session) throws Exception {
logger.info("服务器与客户端创建连接...");
super.sessionCreated(session);
}
//服务器与客户端连接打开
@Override
public void sessionOpened(IoSession session) throws Exception {
logger.info("服务器与客户端连接打开...");
super.sessionOpened(session);
}
@Override
public void sessionIdle(IoSession session, IdleStatus status)
throws Exception {
logger.info("服务器进入空闲状态...");
super.sessionIdle(session, status);
}
@Override
public void exceptionCaught(IoSession session, Throwable cause)
throws Exception {
logger.info("服务器发送异常...");
super.exceptionCaught(session, cause);
}
}
很直白的一段程序,相当于将服务器分成了七个状态,而每个状态都有自己的一套逻辑处理方案。
至此,一个最简单的Mina服务器框架就搭好了,我们可以使用电脑上的telnet命令来测试一下服务器能否使用
cmd控制台—>telnet <ip地址> <端口号> 如我的服务器ip地为192.168.1.10 那我就写telnet 192.168.1.10 4444 .此时我们可以看到输出日志为
此时连接已经创建,我们在输入信息服务器就会对信息进行处理,并给出相应的应答。
(telnet的用法不知道的可以自行网络)
三、Mina客户端(Android端)
服务器简单搭建完毕,那么开始在Android端是配置服务器吧。同样的不要忘记加载jar包, 由于Android自带了Logout,所以就不使用Mina的日志包了。
由于接受消息会阻塞Android的进程,所以我把它开在子线程中(同时将其放在Service中,让其在后台运行)
1 public class MinaThread extends Thread {
2
3 private IoSession session = null;
4
5 @Override
6 public void run() {
7 // TODO Auto-generated method stub
8 Log.d("TEST","客户端链接开始...");
9 IoConnector connector = new NioSocketConnector();
10 //设置链接超时时间
11 connector.setConnectTimeoutMillis(30000);
12 //添加过滤器
13 //connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new CharsetCodecFactory()));
14 connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"),LineDelimiter.WINDOWS.getValue(),LineDelimiter.WINDOWS.getValue())));
15 connector.setHandler(new MinaClientHandler(minaService));
16
17 try{
18 ConnectFuture future = connector.connect(new InetSocketAddress(ConstantUtil.WEB_MATCH_PATH,ConstantUtil.WEB_MATCH_PORT));//创建链接
19 future.awaitUninterruptibly();// 等待连接创建完成
20 session = future.getSession();//获得session
21 session.write("start");
22 }catch (Exception e){
23 Log.d("TEST","客户端链接异常...");
24 }
25 session.getCloseFuture().awaitUninterruptibly();//等待连接断开
26 Log.d("TEST","客户端断开...");
27 connector.dispose();
28 super.run();
29 }
30
31 }
不知道你们注意到了没,客户端的代码与服务器端的极其相似,不同的是服务器是创建NioSocketAcceptor对象,而客户端是创建NioSocketConnect对象。当然同样需要添加编码解码过滤器和业务逻辑过滤器。
业务逻辑过滤器代码:
1 public class MinaClientHandler extends IoHandlerAdapter{
2
3
4 @Override
5 public void exceptionCaught(IoSession session, Throwable cause)
6 throws Exception {
7 Log.d("TEST","客户端发生异常");
8 super.exceptionCaught(session, cause);
9 }
10
11 @Override
12 public void messageReceived(IoSession session, Object message)
13 throws Exception {
14 String msg = message.toString();
15 Log.d("TEST","客户端接收到的信息为:" + msg);
16 super.messageReceived(session, message);
17 }
18
19 @Override
20 public void messageSent(IoSession session, Object message) throws Exception {
21 // TODO Auto-generated method stub
22 super.messageSent(session, message);
23 }
24 }
方法功能与服务器端一样。测试这里就不做了。可以的话自己写个Demo效果更好
四、Mina的更多功能
拿到所有客户端Session
Collection<IoSession> sessions = session.getService().getManagedSessions().values();
自定义编码解码器,可以对消息进行预处理。要继承ProtocolEncoder和ProtocolDecode类。
数据对象的传递
这些功能不便放在这里讲了,可能我会以后再找机会另开一篇来讲述这些功能~,大家可以浏览结尾处的参考文章来加深对mina的理解。
在我认为,熟悉和快速使用一个新的的框架可以看出一个程序员的水平,同样及时总结和归纳自己学到的新知识也是一个好的程序员该具有的习惯。那么Mina的简单搭建就到这里为止了,希望对大家有所帮助
Ⅳ workerman汉语什么意思
workerman是一个高性能的php socket 服务器框架。
workerman基于PHP多进程以及libevent事件轮询库,PHP开发者只要实现一两个接口,便可以开发出自己的网络应用,例如Rpc服务、聊天室服务器、手机游戏服务器等。
workerman的目标是让PHP开发者更容易的开发出基于socket的高性能的应用服务,而不用去了解PHP socket以及PHP多进程细节。 workerman本身是一个PHP多进程服务器框架,具有PHP进程管理以及socket通信的模块,所以不依赖php-fpm、nginx或者apache等这些容器便可以独立运行。