2017年10月22日(星期天)  农历:丁酉年九月初三

作者:三年。分类: IT杂谈 标签: Netty介绍 Netty

QQ图片20160818101128.png

Netty是由JBOSS提供的一个java开源框架。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的基于NIO的客户,服务器端编程框架。


前言:

高性能的三大主题:
1) 传输:用什么样的通道将数据发送给对方,BIO、NIO或者AIO,IO模型在很大程度上决定了框架的性能。
2) 协议:采用什么样的通信协议,HTTP或者内部私有协议。协议的选择不同,性能模型也不同。相比于公有协议,内部私有协议的性能通常可以被设计的更优。
3) 线程:数据报如何读取?读取之后的编解码在哪个线程进行,编解码后的消息如何派发,Reactor线程模型的不同,对性能的影响也非常大。

Netty 高性能的表现:
1. 异步非阻塞通信
2. 零拷贝
2]1) Netty的接收和发送ByteBuffer采用DIRECT BUFFERS,使用堆外直接内存进行Socket读写,不需要进行字节缓冲区的二次拷贝
2]2) Netty提供了组合Buffer对象,可以聚合多个ByteBuffer对象,用户可以像操作一个Buffer那样方便的对组合Buffer进行操作,避免系统统通过内存拷贝的方式将几个小Buffer合并成一个大的Buffer。
2]3) Netty的文件传输采用了transferTo方法,它可以直接将文件缓冲区的数据发送到目标Channel,避免了传统通过循环write方式导致的内存拷贝问题。
2]
3. 内存池
4. 高效的Reactor线程模型
5. 无锁化的串行设计理念
6. 高效的并发编程
7. 高性能的序列化框架
8. 灵活的TCP参数配置能力

Netty 服务端创建(详细解析):
  • 创建TimeService
package cn.zxm.netty.demo;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

/**
* Time Netty服务端
*
* [url="http://my.oschina.net/arthor"]@author[/url] zhouxm@inspur.com
*
*/
public class TimeService {
   public void bind(int port) throws Exception {
	  // 配置NIO服务端线程组
	  EventLoopGroup bossGroup = new NioEventLoopGroup();// 用于服务端接收客户端的链接
	  EventLoopGroup workGroup = new NioEventLoopGroup();// 用于SocketChannel网络读写
	  try {
		 // 创建NIO服务端辅助启动类
		 ServerBootstrap b = new ServerBootstrap();
		 b.group(bossGroup, workGroup).channel(NioServerSocketChannel.class)
				.option(ChannelOption.SO_BACKLOG, 1024)
				.childHandler(new ChildChannelHander());
		 // 绑定监听端口,同步等待成功
		 ChannelFuture f = b.bind(port).sync();
		 // 等待监听端口关闭
		 f.channel().closeFuture().sync();
		 // b.bind(port).sync().channel().closeFuture().sync();//也可以这样将上面两步骤合并,使代码更简洁
	  } finally {
		 // 退出,释放线程池资源
		 bossGroup.shutdownGracefully();
		 workGroup.shutdownGracefully();
	  }
   }

   // 用户处理网络IO事件,类似于Reactor模式中的handle类 例:消息编解码 日志打印
   private class ChildChannelHander extends ChannelInitializer<socketchannel> {
	  @Override
	  protected void initChannel(SocketChannel arg0) throws Exception {
		 // TODO Auto-generated method stub
		 ChannelPipeline pipeline = arg0.pipeline();
		 // 以("\n")为结尾分割的 解码器
		 pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192,
				Delimiters.lineDelimiter()));
		 // 字符串解码 和 编码
		 pipeline.addLast("decoder", new StringDecoder());
		 pipeline.addLast("encoder", new StringEncoder());
		 // 自己的逻辑Handler
		 pipeline.addLast("handler", new TimeServerHander());
	  }

   }
public static void main(String args[]){
	  int port = 8080;
	  try {
		 new TimeService().bind(port);
	  } catch (Exception e) {
		 // TODO Auto-generated catch block
		 e.printStackTrace();
	  }
   }
}
  • TimeServerHander
package cn.zxm.netty.demo;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

/**
* 逻辑handle类
*
* [url="http://my.oschina.net/arthor"]@author[/url] zhouxm@inspur.com
*
*/
public class TimeServerHander extends ChannelHandlerAdapter {

   /*
	* (non-Javadoc)
	*
	* @see
	* io.netty.channel.ChannelHandlerAdapter#channelActive(io.netty.channel
	* .ChannelHandlerContext)
	*/
   @Override
   public void channelActive(ChannelHandlerContext ctx) throws Exception {
	  // TODO Auto-generated method stub
	  super.channelActive(ctx);
   }

   /*
	* (non-Javadoc)
	*
	* @see io.netty.channel.ChannelHandlerAdapter#channelRead(io.netty.channel.
	* ChannelHandlerContext, java.lang.Object)
	*/
   @Override
   public void channelRead(ChannelHandlerContext ctx, Object msg)
		 throws Exception {
	  // TODO Auto-generated method stub
	  ByteBuf buf = (ByteBuf) msg;// 将msg转换成Netty ByteBuf 类似于jdk中的ByteBuffer
	  byte[] req = new byte[buf.readableBytes()];// 根据字节数创建byte数组
	  buf.readBytes(req);// 将缓冲区中的字节数组复制到 byte数组中
	  String body = new String(req, "utf-8");// 创建构造函数接收消息
	  System.out.print("client send message :" + body);// 打印
	  String resString = "response msg to client ";
	  ByteBuf b = Unpooled.copiedBuffer(resString.getBytes());
	  ctx.write(B)/>/>/>;// 发送消息给客户端
   }

   /*
	* (non-Javadoc)
	*
	* @see
	* io.netty.channel.ChannelHandlerAdapter#channelReadComplete(io.netty.channel
	* .ChannelHandlerContext)
	*/
   @Override
   public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
	  // TODO Auto-generated method stub
	  ctx.flush();// 将队列中的消息写入到SocketChannel发送给客户端
   }

   /*
	* (non-Javadoc)
	*
	* @see
	* io.netty.channel.ChannelHandlerAdapter#exceptionCaught(io.netty.channel
	* .ChannelHandlerContext, java.lang.Throwable)
	*/
   @Override
   public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
		 throws Exception {
	  // TODO Auto-generated method stub
	  ctx.close();// 关闭,释放资源
   }

}

创建Netty客户端:
一.创建TimeClient:

package cn.zxm.netty.demo;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

public class TimeClient {
   public void connect(String host, int port) throws Exception {
	  // 配置客户端NIO线程组
	  EventLoopGroup group = new NioEventLoopGroup();
	  try {
		 // 创建NIO客户端辅助启动类
		 Bootstrap b = new Bootstrap();
		 b.group(group).channel(NioSocketChannel.class)
				.option(ChannelOption.TCP_NODELAY, true)
				.handler(new ClientTimeHander());
		 //发起异步链接操作
		 ChannelFuture f = b.connect(host, port).sync();
		 //等待客户端链路关闭
		 f.channel().closeFuture().sync();
	  } finally {
		 group.shutdownGracefully();// 关闭  释放资源
	  }
   }

   private class ClientTimeHander extends ChannelInitializer<socketchannel> {
	  @Override
	  protected void initChannel(SocketChannel arg0) throws Exception {
		 // TODO Auto-generated method stub
		 ChannelPipeline pipeline = arg0.pipeline();
		 // 以("\n")为结尾分割的 解码器
		 pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192,
				Delimiters.lineDelimiter()));
		 // 字符串解码 和 编码
		 pipeline.addLast("decoder", new StringDecoder());
		 pipeline.addLast("encoder", new StringEncoder());
		 // 自己的逻辑Handler
		 pipeline.addLast("handler", new TimeClientHander());
	  }

   }

   public static void main(String[] args) {
	  // TODO Auto-generated method stub
	  String  host = "localhost";
	  int	 port = 8080;
	  try {
		 new TimeClient().connect(host, port);
	  } catch (Exception e) {
		 // TODO Auto-generated catch block
		 e.printStackTrace();
	  }
   }

}

二.创建TimeClientHander:


package cn.zxm.netty.demo;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

public class TimeClientHander extends ChannelHandlerAdapter{
	
   private final ByteBuf firstMsg;
   public TimeClientHander(){
	  byte[] req = "send msg to server date".getBytes();
	  firstMsg = Unpooled.buffer(req.length);
	  firstMsg.writeBytes(req);
   }
   /* (non-Javadoc)
	* @see io.netty.channel.ChannelHandlerAdapter#channelActive(io.netty.channel.ChannelHandlerContext)
	*/
   @Override
   public void channelActive(ChannelHandlerContext ctx) throws Exception {
	  // TODO Auto-generated method stub
	  ctx.writeAndFlush(firstMsg);
   }

   /* (non-Javadoc)
	* @see io.netty.channel.ChannelHandlerAdapter#channelRead(io.netty.channel.ChannelHandlerContext, java.lang.Object)
	*/
   @Override
   public void channelRead(ChannelHandlerContext ctx, Object msg)
		 throws Exception {
	  // TODO Auto-generated method stub
	  ByteBuf buf = (ByteBuf) msg;// 将msg转换成Netty ByteBuf 类似于jdk中的ByteBuffer
	  byte[] req = new byte[buf.readableBytes()];// 根据字节数创建byte数组
	  buf.readBytes(req);// 将缓冲区中的字节数组复制到 byte数组中
	  String body = new String(req, "utf-8");// 创建构造函数接收消息
	  System.out.print("server send message :" + body);// 打印
   }

   /* (non-Javadoc)
	* @see io.netty.channel.ChannelHandlerAdapter#exceptionCaught(io.netty.channel.ChannelHandlerContext, java.lang.Throwable)
	*/
   @Override
   public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
		 throws Exception {
	  // TODO Auto-generated method stub
	  ctx.close();
   }

}
温馨提示如有转载或引用以上内容之必要,敬请将本文链接作为出处标注,谢谢合作!

已有 0/2502 人参与

发表评论:



手Q扫描加入Java初学者群