Netty客户端断线重连实现及问题思考( 四 )

com.bruce.netty.rpc.client.ClientHeartbeatHandler
public class ClientHeartbeatHandler extends ChannelInboundHandlerAdapter {private static final InternalLogger log = InternalLoggerFactory.getInstance(ClientHeartbeatHandler.class);@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {if (msg.equals("pong")) {log.info("收到心跳回复");} else {super.channelRead(ctx, msg);}}@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {if (evt instanceof IdleStateEvent) {//该事件需要配合 io.netty.handler.timeout.IdleStateHandler使用IdleStateEvent idleStateEvent = (IdleStateEvent) evt;if (idleStateEvent.state() == IdleState.ALL_IDLE) {//向服务端发送心跳检测ctx.writeAndFlush("ping");log.info("发送心跳数据");} else if (idleStateEvent.state() == IdleState.READER_IDLE) {//超过指定时间没有读事件,关闭连接log.info("超过心跳时间,关闭和服务端的连接:{}", ctx.channel().remoteAddress());ctx.channel().close();}} else {super.userEventTriggered(ctx, evt);}}}先启动server端,再启动client端,待连接成功之后kill掉 server端进程 。

Netty客户端断线重连实现及问题思考

文章插图
 
【Netty客户端断线重连实现及问题思考】通过客户端日志可以看出,先是执行了exceptionCaught方法然后执行了channelInactive方法,但是这两个方法中都调用了reconnection方法,导致同时执行了两次重连 。
为什么执行了exceptionCaught方法又执行了channelInactive方法呢?
我们可以在exceptionCaught和channelInactive方法添加断点一步步查看源码
Netty客户端断线重连实现及问题思考

文章插图
 
当NioEventLoop执行select操作之后,处理相应的SelectionKey,发生异常后,会调用AbstractNioByteChannel.NioByteUnsafe#handleReadException方法进行处理,并触发pipeline.fireExceptionCaught(cause),最终调用到用户handler的fireExceptionCaught方法 。
private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close,RecvByteBufAllocator.Handle allocHandle) { if (byteBuf != null) {if (byteBuf.isReadable()) {readPending = false;pipeline.fireChannelRead(byteBuf);} else {byteBuf.release();} } allocHandle.readComplete(); pipeline.fireChannelReadComplete(); pipeline.fireExceptionCaught(cause); // If oom will close the read event, release connection. // See https://github.com/netty/netty/issues/10434 if (close || cause instanceof OutOfMemoryError || cause instanceof IOException) {closeonRead(pipeline); }}该方法最后会判断异常类型,执行close连接的方法 。在连接断线的场景中,这里即为java.io.IOException,所以执行了close方法,当debug到AbstractChannel.AbstractUnsafe#close(ChannelPromise, Throwable, ClosedChannelException, notify)方法中会发现最后又调用了AbstractUnsafe#fireChannelInactiveAndDeregister方法,继续debug最后则会执行自定义的fireChannelInactive方法 。
到这里可以总结一个知识点:netty中当执行到handler地fireExceptionCaught方法时,可能会继续触发到fireChannelInactive,也可能不会触发fireChannelInactive 。
除了netty根据异常类型判断是否执行close方法外,其实开发人员也可以自己通过ctx或者channel去调用close方法,代码如下:
@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {if (cause instanceof IOException) {log.warn("exceptionCaught:客户端[{}]和远程断开连接", ctx.channel().localAddress());} else {log.error(cause);}//ctx.close();ctx.channel().close();}但这种显示调用close方法,是否一定会触发调用fireChannelInactive呢?
如果是,那么只需要在exceptionCaught中调用close方法,fireChannelInactive中做重连的逻辑即可!!
在笔者通过日志观察到,在exceptionCaught中调用close方法每次都会调用fireChannelInactive方法 。但是查看源码,笔者认为这是不一定的,因为在AbstractChannel.AbstractUnsafe#close(ChannelPromise,Throwable, ClosedChannelException, notify)中会调用io.netty.channel.Channel#isActive进行判断,只有为true,才会执行fireChannelInactive方法 。
//io.netty.channel.socket.nio.NioSocketChannel#isActive@Overridepublic boolean isActive() {SocketChannel ch = javaChannel();return ch.isOpen() && ch.isConnected();}如何解决同时执行两次问题呢?
在netty初始化时,我们都会添加一系列的handler处理器,这些handler实际上会在netty创建Channel对象(NioSocketChannel)时,被封装在DefaultChannelPipeline中,而DefaultChannelPipeline实际上是一个双向链表,头节点为TailContext,尾节点为TailContext,而中间的节点则是我们添加的一个个handler(被封装成DefaultChannelHandlerContext),当执行Pipeline上的方法时,会从链表上遍历handler执行,因此当执行exceptionCaught方法时,我们只需要提前移除链表上自定义的Handler则无法执行fireChannelInactive方法 。


推荐阅读