Ⅰ socket高並發網路編程服務端有什麼框架
netty;
PayServer.java
package com.miri.pay.scoket;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class PayServer implements Runnable
{
private static final Logger DLOG = LoggerFactory.getLogger(PayServer.class);
private final int port;
public PayServer(int port)
{
this.port = port;
}
/**
* 為ServerBootstrap綁定指定埠
*/
public void run()
{
// 用於接收發來的連接請求
final EventLoopGroup bossGroup = new NioEventLoopGroup();
// 用於處理boss接受並且注冊給worker的連接中的信息
final EventLoopGroup workerGroup = new NioEventLoopGroup();
try
{
// 配置伺服器
final ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup);
bootstrap.channel(NioServerSocketChannel.class);
bootstrap.option(ChannelOption.SO_BACKLOG, 128);
// 通過NoDelay禁用Nagle,使消息立即發出去,不用等待到一定的數據量才發出去
bootstrap.option(ChannelOption.TCP_NODELAY, true);
// 保持長連接狀態
bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
// CustomChannelInitializer是一個特殊的handler,用於方便的配置用戶自定義的handler實現
bootstrap.childHandler(new CustomChannelInitializer());
// 綁定並開始接受傳入的連接
final ChannelFuture future = bootstrap.bind(this.port).sync();
if (future.isSuccess())
{
PayServer.DLOG.info("Start the socket server {} success", this.port);
}
else
{
PayServer.DLOG.info("Start the socket server {} failure,System exit!", this.port);
throw new RuntimeException("Socket服務端啟動失敗");
}
// 等待伺服器套接字關閉
// 關閉伺服器
future.channel().closeFuture().sync();
}
catch (final InterruptedException e)
{
PayServer.DLOG.error("Close the socket server exception occurs,System exit!", e);
throw new RuntimeException("關閉Socket服務端失敗");
}
finally
{
// 關閉所有事件循環終止線程
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
/**
* 特殊的內部類
* <p>
* 是一個特殊的handler,用於方便的配置用戶自定義的handler實現
* @author xulonghui
*/
static class CustomChannelInitializer extends ChannelInitializer<SocketChannel>
{
@Override
protected void initChannel(SocketChannel ch) throws Exception
{
final ChannelPipeline p = ch.pipeline();
p.addLast(new PayMessageEncoder());
p.addLast(new PayMessageDecoder());
p.addLast(new PayServerHandler());
}
}
}
PayMessageEncoder.java
package com.miri.pay.scoket;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import io.netty.util.CharsetUtil;
import com.miri.pay.model.CommonResponse;
import com.miri.pay.utils.JsonUtil;
/**
*消息編碼器
* <p>
* 編碼從服務端發送出的消息
*/
public class PayMessageEncoder extends MessageToByteEncoder<CommonResponse>
{
@Override
protected void encode(ChannelHandlerContext ctx, CommonResponse rsp, ByteBuf out) throws Exception
{
if (rsp != null)
{
final Object msgContent = rsp.getMsgContent();
// 消息ID,sequenceId和entityId三個加起來是12個長度
int msgLen = 12;
byte[] contentbs = new byte[] {};
if (msgContent != null)
{
final String content = JsonUtil.bean2json(msgContent);
contentbs = content.getBytes(CharsetUtil.UTF_8);
final int cl = contentbs.length;
msgLen += cl;
}
out.writeInt(msgLen);// 寫入當前消息的總長度
out.writeInt(rsp.getMsgId());// 寫入當前消息的消息ID
out.writeInt(rsp.getSequenceId());// 寫入當前消息的SequenceId
out.writeInt(rsp.getEntityId());// 寫入當前消息的EntityId
// 寫入消息主體內容
if (contentbs.length > 0)
{
out.writeBytes(contentbs);
}
}
}
}
PayMessageDecoder.java
package com.miri.pay.scoket;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.util.CharsetUtil;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.miri.pay.constants.Constants;
import com.miri.pay.model.CommonRequest;
import com.miri.pay.utils.ByteUtil;
/**
* 消息解碼器
* <p>
* 解碼從客戶端請求的消息
*/
public class PayMessageDecoder extends ByteToMessageDecoder
{
private static final Logger DLOG = LoggerFactory.getLogger(PayMessageDecoder.class);
/**
* 表示頭長度的位元組數
*/
private static final int HEAD_LENGTH = 4;
/**
* 所有ID串所屬的位元組數
*/
private static final int ID_STR_LENGTH = 12;
/**
* 單個ID所屬的位元組數
*/
private static final int SINGLE_ID_LENGTH = 4;
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception
{
int readable = in.readableBytes();
if (readable < PayMessageDecoder.HEAD_LENGTH)
{
return;
}
in.markReaderIndex(); // 我們標記一下當前的readIndex的位置
final int dataLength = in.readInt(); // 讀取傳送過來的消息的長度。ByteBuf 的readInt()方法會讓他的readIndex增加4
if (dataLength < 0)
{
// 我們讀到的消息體長度為0,這是不應該出現的情況,這里出現這情況,關閉連接。
ctx.close();
}
readable = in.readableBytes();
if (readable < dataLength)
{
// 讀到的消息體長度如果小於我們傳送過來的消息長度,則resetReaderIndex. 這個配合markReaderIndex使用的。把readIndex重置到mark的地方
in.resetReaderIndex();
return;
}
final byte[] body = new byte[dataLength];
in.readBytes(body);
// 判斷是否讀取到內容
final int length = body.length;
if (length == 0)
{
return;// 若未讀出任何內容,則忽略
}
out.add(this.byte2req(body));
}
/**
* 將讀取到的byte數據轉換為請求對象
* @param body
* @return
* @throws Exception
*/
private CommonRequest byte2req(byte[] body) throws Exception
{
final CommonRequest req = new CommonRequest(Constants.INVALID_MSGID);
final int length = body.length;
// 若內容數組的長度小於或等於12,則表示消息主體內容為空,直接返回一個無效的消息出去
if (length < PayMessageDecoder.ID_STR_LENGTH)
{
PayMessageDecoder.DLOG
.info("The client sends the message length is: {}, is invalid message, directly returns a msgId = {} request entity",
length, Constants.INVALID_MSGID);
return req;
}
// 獲取消息ID
final byte[] mbs = new byte[PayMessageDecoder.SINGLE_ID_LENGTH];
System.array(body, 0, mbs, 0, PayMessageDecoder.SINGLE_ID_LENGTH);
final int msgId = ByteUtil.byte4toint(mbs);
req.setMsgId(msgId);
// 獲取sequenceId
final byte[] sbs = new byte[PayMessageDecoder.SINGLE_ID_LENGTH];
System.array(body, 4, sbs, 0, PayMessageDecoder.SINGLE_ID_LENGTH);
final int sequenceId = ByteUtil.byte4toint(sbs);
req.setSequenceId(sequenceId);
// 獲取entityId
final byte[] ebs = new byte[PayMessageDecoder.SINGLE_ID_LENGTH];
System.array(body, 8, ebs, 0, PayMessageDecoder.SINGLE_ID_LENGTH);
final int entityId = ByteUtil.byte4toint(ebs);
req.setEntityId(entityId);
// 獲取消息主體內容
if (length > PayMessageDecoder.ID_STR_LENGTH)
{
final int contentLen = length - PayMessageDecoder.ID_STR_LENGTH;
final byte[] contentbs = new byte[contentLen];
System.array(body, 12, contentbs, 0, contentLen);
final String content = new String(contentbs, CharsetUtil.UTF_8);
req.setMsgContent(content);
}
return req;
}
}
PayServerHandler.java
package com.miri.pay.scoket;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;
import java.util.HashMap;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.miri.pay.MessageQueue;
import com.miri.pay.model.CommonRequest;
import com.miri.pay.model.PendingBean;
/**
* Socket服務端處理器
*/
public class PayServerHandler extends ChannelInboundHandlerAdapter
{
private static final Logger DLOG = LoggerFactory.getLogger(PayServerHandler.class);
/**
* 外部訂單號-頻道
*/
public static final Map<String, Channel> CHANNELS = new HashMap<String, Channel>();
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
{
try
{
PayServerHandler.DLOG.info("Client send to msg is: {}", msg);
final CommonRequest request = (CommonRequest) msg;
final PendingBean bean = new PendingBean(ctx.channel(), request);
MessageQueue.offer(bean);
}
finally
{
ReferenceCountUtil.release(msg);
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception
{
ctx.flush();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception
{
super.channelActive(ctx);
final Channel channel = ctx.channel();
PayServerHandler.DLOG.info("Client active form {}", channel.remoteAddress());
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception
{
super.channelInactive(ctx);
final Channel channel = ctx.channel();
PayServerHandler.DLOG.info("Client inactive form {}", channel.remoteAddress());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception
{
PayServerHandler.DLOG.error("System exception", cause);
ctx.close();
}
}
Ⅱ android上的socket通信的開源框架有哪些
Netty是由JBOSS提供的一個java開源框架。Netty提供非同步的、事件驅動的網路應用程序框架和工具,用以快速開發高性能、高可靠性的網路伺服器和客戶端程序。也就是說,Netty 是一個基於NIO的客戶,伺服器端編程框架,它在socket的基礎上根據各種常用的應用協議又進一步封裝,提供更便利的介面。如果需要快速搭建一個C/S服務框架,那Netty過來用是沒錯。 反過來你的情況是需要學習這個課程,你應該掌握基本的socket編程及其通信原理,所以學習時直接用socket編程比較好。也許哪一天,你靈感來了,編出一個比Netty更好的框架,一個更牛的軟體。
Ⅲ android上的socket通信的開源框架有哪些
請去360手機助手下載android學習手冊裡面有例子、源碼和文檔
Apache MINA(Multipurpose Infrastructure for Network Applications) 是 Apache 組織一個較新的項目,它為開發高性能和高可用性的網路應用程序提供了非常便利的框架。當前發行的 MINA 版本支持基於 Java NIO 技術的 TCP/UDP 應用程序開發、串口通訊程序(只在最新的預覽版中提供),MINA 所支持的功能也在進一步的擴展中。目前正在使用 MINA 的軟體包括有:Apache Directory Project、AsyncWeb、AMQP(Advanced Message Queuing Protocol)、RED5 Server(Macromedia Flash Media RTMP)、ObjectRADIUS、Openfire 等等。
以上是從網上找到的mina框架簡單介紹。
由於正在開發的項目中要求加入及時通信功能(游戲方面),所以在網上找了好幾種框架,像openfire、tigase等都是基於Xmpp協議開發的優秀框架。但這些側重於消息的推送,不適合游戲上的簡單交互。所以後來找到了mina這個框架,順手搭建起來。接下來就是這幾天學習的總結了,文章裡面沒有涉及到邏輯層的方面,只是簡單的實現即時通信功能。資源下載我會放在文章的最後面。
一、相關資源下載
(1)Apache官方網站:http://mina.apache.org/downloads.html
(2) Android用jar包(包括官網的資源,我會一律放在網路網盤下)
二、Mina簡單配置
伺服器端一共要用到四個jar包,包括一個日誌包。將他們放在lib中,並載入進去
分別為mina-core-2.0.7.jar slf4j-log4j12-1.7.6.jar slf4j-api-1.7.6.jar log4j-1.2.14.jar(日誌管理包)
如果要使用日誌的jar包,則要在項目的src目錄下新建一個log4j.properties,添加內容如下:
log4j.rootCategory=INFO, stdout , R
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[QC] %p [%t] %C.%M(%L) | %m%n
log4j.appender.R=org.apache.log4j.DailyRollingFileAppender
log4j.appender.R.File=D:\Tomcat 5.5\logs\qc.log
log4j.appender.R.layout=org.apache.log4j.PatternLayout
1log4j.appender.R.layout.ConversionPattern=%d-[TS] %p %t %c - %m%n
log4j.logger.com.neusoft=DEBUG
log4j.logger.com.opensymphony.oscache=ERROR
log4j.logger.net.sf.navigator=ERROR
log4j.logger.org.apache.commons=ERROR
log4j.logger.org.apache.struts=WARN
log4j.logger.org.displaytag=ERROR
log4j.logger.org.springframework=DEBUG
log4j.logger.com.ibatis.db=WARN
log4j.logger.org.apache.velocity=FATAL
log4j.logger.com.canoo.webtest=WARN
log4j.logger.org.hibernate.ps.PreparedStatementCache=WARN
log4j.logger.org.hibernate=DEBUG
log4j.logger.org.logicalcobwebs=WARN
log4j.rootCategory=INFO, stdout , R
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[QC] %p [%t] %C.%M(%L) | %m%n
log4j.appender.R=org.apache.log4j.DailyRollingFileAppender
log4j.appender.R.File=D:\Tomcat 5.5\logs\qc.log
log4j.appender.R.layout=org.apache.log4j.PatternLayout
1log4j.appender.R.layout.ConversionPattern=%d-[TS] %p %t %c - %m%n
log4j.logger.com.neusoft=DEBUG
log4j.logger.com.opensymphony.oscache=ERROR
log4j.logger.net.sf.navigator=ERROR
log4j.logger.org.apache.commons=ERROR
log4j.logger.org.apache.struts=WARN
log4j.logger.org.displaytag=ERROR
log4j.logger.org.springframework=DEBUG
log4j.logger.com.ibatis.db=WARN
log4j.logger.org.apache.velocity=FATAL
log4j.logger.com.canoo.webtest=WARN
log4j.logger.org.hibernate.ps.PreparedStatementCache=WARN
log4j.logger.org.hibernate=DEBUG
log4j.logger.org.logicalcobwebs=WARN
Android客戶端要加入的jar包:mina-core-2.0.7.jar slf4j-android-1.6.1-RC1.jar兩個jar包(可能直接使用上面的jar包也會行,我沒試過~)
二、Mina服務端
我這邊使用的是mina2.0版本,所以可能與mina1.0的版本有所不同。那麼首先在伺服器端創建開始
新建一個Demo1Server.class文件,裡麵包含著程序的入口,埠號,Acceptor連接.
1 public class Demo1Server {
2 //日誌類的實現
3 private static Logger logger = Logger.getLogger(Demo1Server.class);
4 //埠號,要求客戶端與伺服器端一致
5 private static int PORT = 4444;
6
7 public static void main(String[] args){
8 IoAcceptor acceptor = null;
9 try{
10 //創建一個非阻塞的server端的Socket
11 acceptor = new NioSocketAcceptor();
12 //設置過濾器(使用mina提供的文本換行符編解碼器)
13 acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"),LineDelimiter.WINDOWS.getValue(),LineDelimiter.WINDOWS.getValue())));
14 //自定義的編解碼器
15 //acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(new CharsetCodecFactory()));
16 //設置讀取數據的換從區大小
17 acceptor.getSessionConfig().setReadBufferSize(2048);
18 //讀寫通道10秒內無操作進入空閑狀態
19 acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);
20 //為接收器設置管理服務
21 acceptor.setHandler(new Demo1ServerHandler());
22 //綁定埠
23 acceptor.bind(new InetSocketAddress(PORT));
24
25 logger.info("伺服器啟動成功... 埠號未:"+PORT);
26
27 }catch(Exception e){
28 logger.error("伺服器啟動異常...",e);
29 e.printStackTrace();
30 }
31 }
32
33 }
一個很簡單的程序入口吧,簡單的說就是在伺服器上設置一個消息接收器,讓它監聽從埠傳過來的消息並進行處理。那麼接下來我們看看怎麼進行消息處理。
新建一個消息處理類,或者說是是業務邏輯處理器——Demo1ServerHandler,它繼承了IoHandlerAdapter類,它默認覆蓋了七個方法,而我們主要使用messageReceived()。
public class Demo1ServerHandler extends IoHandlerAdapter {
public static Logger logger = Logger.getLogger(Demo1ServerHandler.class);
//從埠接受消息,會響應此方法來對消息進行處理
@Override
public void messageReceived(IoSession session, Object message)
throws Exception {
String msg = message.toString();
if("exit".equals(msg)){
//如果客戶端發來exit,則關閉該連接
session.close(true);
}
//向客戶端發送消息
Date date = new Date();
session.write(date);
logger.info("伺服器接受消息成功...");
super.messageReceived(session, message);
}
//向客服端發送消息後會調用此方法
@Override
public void messageSent(IoSession session, Object message) throws Exception {
logger.info("伺服器發送消息成功...");
super.messageSent(session, message);
}
//關閉與客戶端的連接時會調用此方法
@Override
public void sessionClosed(IoSession session) throws Exception {
logger.info("伺服器與客戶端斷開連接...");
super.sessionClosed(session);
}
//伺服器與客戶端創建連接
@Override
public void sessionCreated(IoSession session) throws Exception {
logger.info("伺服器與客戶端創建連接...");
super.sessionCreated(session);
}
//伺服器與客戶端連接打開
@Override
public void sessionOpened(IoSession session) throws Exception {
logger.info("伺服器與客戶端連接打開...");
super.sessionOpened(session);
}
@Override
public void sessionIdle(IoSession session, IdleStatus status)
throws Exception {
logger.info("伺服器進入空閑狀態...");
super.sessionIdle(session, status);
}
@Override
public void exceptionCaught(IoSession session, Throwable cause)
throws Exception {
logger.info("伺服器發送異常...");
super.exceptionCaught(session, cause);
}
}
很直白的一段程序,相當於將伺服器分成了七個狀態,而每個狀態都有自己的一套邏輯處理方案。
至此,一個最簡單的Mina伺服器框架就搭好了,我們可以使用電腦上的telnet命令來測試一下伺服器能否使用
cmd控制台—>telnet <ip地址> <埠號> 如我的伺服器ip地為192.168.1.10 那我就寫telnet 192.168.1.10 4444 .此時我們可以看到輸出日誌為
此時連接已經創建,我們在輸入信息伺服器就會對信息進行處理,並給出相應的應答。
(telnet的用法不知道的可以自行網路)
三、Mina客戶端(Android端)
伺服器簡單搭建完畢,那麼開始在Android端是配置伺服器吧。同樣的不要忘記載入jar包, 由於Android自帶了Logout,所以就不使用Mina的日誌包了。
由於接受消息會阻塞Android的進程,所以我把它開在子線程中(同時將其放在Service中,讓其在後台運行)
1 public class MinaThread extends Thread {
2
3 private IoSession session = null;
4
5 @Override
6 public void run() {
7 // TODO Auto-generated method stub
8 Log.d("TEST","客戶端鏈接開始...");
9 IoConnector connector = new NioSocketConnector();
10 //設置鏈接超時時間
11 connector.setConnectTimeoutMillis(30000);
12 //添加過濾器
13 //connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new CharsetCodecFactory()));
14 connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"),LineDelimiter.WINDOWS.getValue(),LineDelimiter.WINDOWS.getValue())));
15 connector.setHandler(new MinaClientHandler(minaService));
16
17 try{
18 ConnectFuture future = connector.connect(new InetSocketAddress(ConstantUtil.WEB_MATCH_PATH,ConstantUtil.WEB_MATCH_PORT));//創建鏈接
19 future.awaitUninterruptibly();// 等待連接創建完成
20 session = future.getSession();//獲得session
21 session.write("start");
22 }catch (Exception e){
23 Log.d("TEST","客戶端鏈接異常...");
24 }
25 session.getCloseFuture().awaitUninterruptibly();//等待連接斷開
26 Log.d("TEST","客戶端斷開...");
27 connector.dispose();
28 super.run();
29 }
30
31 }
不知道你們注意到了沒,客戶端的代碼與伺服器端的極其相似,不同的是伺服器是創建NioSocketAcceptor對象,而客戶端是創建NioSocketConnect對象。當然同樣需要添加編碼解碼過濾器和業務邏輯過濾器。
業務邏輯過濾器代碼:
1 public class MinaClientHandler extends IoHandlerAdapter{
2
3
4 @Override
5 public void exceptionCaught(IoSession session, Throwable cause)
6 throws Exception {
7 Log.d("TEST","客戶端發生異常");
8 super.exceptionCaught(session, cause);
9 }
10
11 @Override
12 public void messageReceived(IoSession session, Object message)
13 throws Exception {
14 String msg = message.toString();
15 Log.d("TEST","客戶端接收到的信息為:" + msg);
16 super.messageReceived(session, message);
17 }
18
19 @Override
20 public void messageSent(IoSession session, Object message) throws Exception {
21 // TODO Auto-generated method stub
22 super.messageSent(session, message);
23 }
24 }
方法功能與伺服器端一樣。測試這里就不做了。可以的話自己寫個Demo效果更好
四、Mina的更多功能
拿到所有客戶端Session
Collection<IoSession> sessions = session.getService().getManagedSessions().values();
自定義編碼解碼器,可以對消息進行預處理。要繼承ProtocolEncoder和ProtocolDecode類。
數據對象的傳遞
這些功能不便放在這里講了,可能我會以後再找機會另開一篇來講述這些功能~,大家可以瀏覽結尾處的參考文章來加深對mina的理解。
在我認為,熟悉和快速使用一個新的的框架可以看出一個程序員的水平,同樣及時總結和歸納自己學到的新知識也是一個好的程序員該具有的習慣。那麼Mina的簡單搭建就到這里為止了,希望對大家有所幫助
Ⅳ workerman漢語什麼意思
workerman是一個高性能的php socket 伺服器框架。
workerman基於PHP多進程以及libevent事件輪詢庫,PHP開發者只要實現一兩個介面,便可以開發出自己的網路應用,例如Rpc服務、聊天室伺服器、手機游戲伺服器等。
workerman的目標是讓PHP開發者更容易的開發出基於socket的高性能的應用服務,而不用去了解PHP socket以及PHP多進程細節。 workerman本身是一個PHP多進程伺服器框架,具有PHP進程管理以及socket通信的模塊,所以不依賴php-fpm、nginx或者apache等這些容器便可以獨立運行。