相关推荐:Mina、Netty、Twisted一起学(十):线程模型

要想开发一个高性能的TCP服务器,熟悉所使用框架的线程模型非常重要。MINA、Netty、Twisted本身都是高性能的网络框架,如果再搭配上高效率的代码,才能实现一个高大上的服务器。但是如果不了解它们的线程模型,就很难写出高性能的代码。框架本身效率再高,程

开发过Web应用的同学应该都会使用session。由于HTTP协议本身是无状态的,所以一个客户端多次访问这个web应用的多个页面,服务器无法判断多次访问的客户端是否是同一个客户端。有了session就可以设置一些和客户端相关的属性,用于保持这种连接状态。例如用户登录系统后,设置session标记这个客户端已登录,那么访问别的页面时就不用再次登录了。不过本文的内容不是Web应用的session,而是TCP连接的session,实际上二者还是有很大区别的。Web应用的session实现方式并不是基于同一个TCP连接,而是通过cookie实现,这里不再详细展开。上面讲到Web应用的session只是让大家理解session的概念。在同步阻塞的网络编程中,代码都是按照TCP操作顺序编写的,即创建连接、多次读写、关闭连接,这样很容易判断这一系列操作是否是同一个连接。而在事件驱动的异步网络编程框架中,IO操作都会触发一个事件调用相应的事件函数,例如接收到客户端的新数据,会调用messageReceived(MINA)、channelRead(Netty)、dataReceived(Twisted),同一个TCP连接的多次请求和多个客户端请求都是一样的。那么如何判断多次请求到底是不是同一个TCP连接,如何保存连接相关的信息?针对这个问题,MINA、Netty、Twisted都提供了相应的解决方案。下面分别用MINA、Netty、Twisted实现一个请求次数计数器,用于记录同一个连接多次请求的请求次数。MINA:在MINA中,每当一个客户端连接到服务器,就会创建一个新的IoSession,直到客户端断开连接才会销毁。IoSession可以用setAttribute和getAttribute来存储和获取一个TCP连接的相关信息。MINA官方文档对IoSession的解释:The Session is at the heart of MINA : every time a client connects to the server, a new session is created, and will be kept in memory until the client is disconnected.A session is used to store persistent informations about the connection, plus any kind of information the server might need to use during the request processing, and eventually during the whole session life.public class TcpServer {

public static void main(String[] args) throws IOException {

IoAcceptor acceptor = new NioSocketAcceptor();

acceptor.getFilterChain().addLast("codec",

new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"), "\r\n", "\r\n")));

acceptor.setHandler(new TcpServerHandle());

acceptor.bind(new InetSocketAddress(8080));

}}class TcpServerHandle extends IoHandlerAdapter {

@Override

public void exceptionCaught(IoSession session, Throwable cause)

throws Exception {

cause.printStackTrace();

}

// 接收到新的数据

@Override

public void messageReceived(IoSession session, Object message)

throws Exception {

int counter = 1;

// 第一次请求,创建session中的counter

if(session.getAttribute("counter") == null) {

session.setAttribute("counter", 1);

} else {

// 获取session中的counter,加1后再存入session

counter = (Integer) session.getAttribute("counter");

counter++;

session.setAttribute("counter", counter);

}

String line = (String) message;

System.out.println("第" + counter + "次请求:" + line);

}}Netty:Netty中分为两种情况,一种是针对每个TCP连接创建一个新的ChannelHandler实例,另一种是所有TCP连接共用一个ChannelHandler实例。这两种方式的区别在于ChannelPipeline的addLast方法中添加的是否是新的ChannelHandler实例。针对每个TCP连接创建一个新的ChannelHandler实例:针对每个TCP连接创建一个新的ChannelHandler实例是最常用的一种方式。这种情况非常简单,直接在ChannelHandler的实现类中加入一个成员变量即可保存连接相关的信息。这也是Netty官方文档中推荐的一种方式,不过要保证针对每个连接创建新的ChannelHandler实例:A ChannelHandler often needs to store some stateful information. The simplest and recommended approach is to use member variables.Because the handler instance has a state variable which is dedicated to one connection, you have to create a new handler instance for each new channel to avoid a race condition where a unauthenticated client can get the confidential information.public class TcpServer {

public static void main(String[] args) throws InterruptedException {

EventLoopGroup bossGroup = new NioEventLoopGroup();

EventLoopGroup workerGroup = new NioEventLoopGroup();

try {

ServerBootstrap b = new ServerBootstrap();

b.group(bossGroup, workerGroup)

.channel(NioServerSocketChannel.class)

.childHandler(new ChannelInitializer<SocketChannel>() {

@Override

public void initChannel(SocketChannel ch) throws Exception {

ChannelPipeline pipeline = ch.pipeline();

pipeline.addLast(new LineBasedFrameDecoder(80));

pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));

pipeline.addLast(new TcpServerHandler()); // 针对每个TCP连接创建一个新的ChannelHandler实例

}

});

ChannelFuture f = b.bind(8080).sync();

f.channel().closeFuture().sync();

} finally {

workerGroup.shutdownGracefully();

bossGroup.shutdownGracefully();

}

}}class TcpServerHandler extends ChannelInboundHandlerAdapter {

// 连接相关的信息直接保存在TcpServerHandler的成员变量中

private int counter = 0;

@Override

public void channelRead(ChannelHandlerContext ctx, Object msg) {

counter++;

String line = (String) msg;

System.out.println("第" + counter + "次请求:" + line);

}

@Override

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {

cause.printStackTrace();

ctx.close();

}}所有TCP连接共用一个ChannelHandler实例:在这种情况下,就不能把连接相关的信息放在ChannelHandler实现类的成员变量中了,否则这些信息会被其他连接共用。这里就要使用到ChannelHandlerContext的Attribute了。Netty文档节选:Although it's recommended to use member variables to store the state of a handler, for some reason you might not want to create many handler instances. In such a case, you can use AttributeKeys which is provided by ChannelHandlerContext.public class TcpServer {

public static void main(String[] args) throws InterruptedException {

EventLoopGroup bossGroup = new NioEventLoopGroup();

EventLoopGroup workerGroup = new NioEventLoopGroup();

try {

ServerBootstrap b = new ServerBootstrap();

b.group(bossGroup, workerGroup)

.channel(NioServerSocketChannel.class)

.childHandler(new ChannelInitializer<SocketChannel>() {

private TcpServerHandler tcpServerHandler = new TcpServerHandler();

@Override

public void initChannel(SocketChannel ch) throws Exception {

ChannelPipeline pipeline = ch.pipeline();

pipeline.addLast(new LineBasedFrameDecoder(80));

pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));

pipeline.addLast(tcpServerHandler); // 多个连接使用同一个ChannelHandler实例

}

});

ChannelFuture f = b.bind(8080).sync();

f.channel().closeFuture().sync();

} finally {

workerGroup.shutdownGracefully();

bossGroup.shutdownGracefully();

}

}}@Sharable // 多个连接使用同一个ChannelHandler,要加上@Sharable注解class TcpServerHandler extends ChannelInboundHandlerAdapter {

private AttributeKey<Integer> attributeKey = AttributeKey.valueOf("counter");

@Override

public void channelRead(ChannelHandlerContext ctx, Object msg) {

Attribute<Integer> attribute = ctx.attr(attributeKey);

int counter = 1;

if(attribute.get() == null) {

attribute.set(1);

} else {

counter = attribute.get();

counter++;

attribute.set(counter);

}

String line = (String) msg;

System.out.println("第" + counter + "次请求:" + line);

}

@Override

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {

cause.printStackTrace();

ctx.close();

}}Twisted:在Twisted中,每个TCP连接都会创建一个新的Protocol实例,这样也就很简单了,直接将连接相关的信息保存为Protocol继承类的属性。Twisted文档节选:An instance of the protocol class is instantiated per-connection, on demand, and will go away when the connection is finished.# -*- coding:utf-8 –*-from twisted.protocols.basic import LineOnlyReceiverfrom twisted.internet.protocol import Factoryfrom twisted.internet import reactorclass TcpServerHandle(LineOnlyReceiver):

# 连接相关的信息直接保存为Protocol继承类TcpServerHandle的属性

counter = 0;

def lineReceived(self, data):

self.counter += 1

print "第" + str(self.counter) + "次请求:" + datafactory = Factory()factory.protocol = TcpServerHandlereactor.listenTCP(8080, factory)reactor.run()下面是一个Java实现的客户端,代码中发起了3次TCP连接,在每个连接中发送两次请求数据到服务器:public class TcpClient {

public static void main(String[] args) throws IOException, InterruptedException {

// 3次TCP连接,每个连接发送2个请求数据

for(int i = 0; i < 3; i++) {

Socket socket = null;

OutputStream out = null;

try {

socket = new Socket("localhost", 8080);

out = socket.getOutputStream();

// 第一次请求服务器

String lines1 = "Hello\r\n";

byte[] outputBytes1 = lines1.getBytes("UTF-8");

out.write(outputBytes1);

out.flush();

// 第二次请求服务器

String lines2 = "World\r\n";

byte[] outputBytes2 = lines2.getBytes("UTF-8");

out.write(outputBytes2);

out.flush();

} finally {

// 关闭连接

out.close();

socket.close();

}

Thread.sleep(1000);

}

}}分别测试上面的4个服务器,输出结果都是:第1次请求:Hello第2次请求:World第1次请求:Hello第2次请求:World第1次请求:Hello第2次请求:World

相关推荐:Mina、Netty、Twisted一起学(五):整合protobuf

protobuf是谷歌的Protocol Buffers的简称,用于结构化数据和字节码之间互相转换(序列化、反序列化),一般应用于网络传输,可支持多种编程语言。protobuf如何使用这里不再介绍,本文主要介绍在MINA、Netty、Twisted中如何使用protobuf,不了解protobuf的同

快照源:http://www.cnblogs.com/wucao/p/3965772.html