彻底搞懂 Netty 线程模型( 四 )

服务端所注册的自定义回调函数 NettyServerHandler:
package com.niuh.netty.base;import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import io.netty.util.CharsetUtil;/** * 自定义Handler需要继承netty规定好的某个HandlerAdapter(规范) */public class NettyServerHandler extends ChannelInboundHandlerAdapter {    /**     * 读取客户端发送的数据     *     * @param ctx 上下文对象, 含有通道channel,管道pipeline     * @param msg 就是客户端发送的数据     * @throws Exception     */    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        System.out.println("服务器读取线程 " + Thread.currentThread().getName());        //Channel channel = ctx.channel();        //ChannelPipeline pipeline = ctx.pipeline(); //本质是一个双向链接, 出站入站        //将 msg 转成一个 ByteBuf,类似NIO 的 ByteBuffer        ByteBuf buf = (ByteBuf) msg;        System.out.println("客户端发送消息是:" + buf.toString(CharsetUtil.UTF_8));    }    /**     * 数据读取完毕处理方法     *     * @param ctx     * @throws Exception     */    @Override    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {        ByteBuf buf = Unpooled.copiedBuffer("HelloClient".getBytes(CharsetUtil.UTF_8));        ctx.writeAndFlush(buf);    }    /**     * 处理异常, 一般是需要关闭通道     *     * @param ctx     * @param cause     * @throws Exception     */    @Override    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {        ctx.close();    }}这个类继承了ChannelInboundHandlerAdapter的几个方法:

  • channelRead方法:当客户端与服务端连通好之后,客户端发数据时,服务端会主动调用这个方法 。
  • channelReadComplete方法:数据处理完毕的方法,ctx.writeAndFlush()就可以往客户端写回数据了 。
客户端代码package com.niuh.netty.base;import io.netty.bootstrap.Bootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;public class NettyClient {    public static void main(String[] args) throws Exception {        //客户端需要一个事件循环组        EventLoopGroup group = new NioEventLoopGroup();        try {            //创建客户端启动对象            //注意客户端使用的不是ServerBootstrap而是Bootstrap            Bootstrap bootstrap = new Bootstrap();            //设置相关参数            bootstrap.group(group) //设置线程组                    .channel(NioSocketChannel.class) // 使用NioSocketChannel作为客户端的通道实现                    .handler(new ChannelInitializer<SocketChannel>() {                        @Override                        protected void initChannel(SocketChannel ch) throws Exception {                            //加入处理器                            ch.pipeline().addLast(new NettyClientHandler());                        }                    });            System.out.println("netty client start 。。");            //启动客户端去连接服务器端            ChannelFuture cf = bootstrap.connect("127.0.0.1", 9000).sync();            //对通道关闭进行监听            cf.channel().closeFuture().sync();        } finally {            group.shutdownGracefully();        }    }}


推荐阅读