Netty客户端断线重连实现及问题思考( 三 )


  • 服务端和客户端之间网络异常,或响应超时(例如有个很长时间的fullGC),客户端需要主动重连其他节点 。
  • 服务端宕机时或者和客户端之间发生任何异常时,客户端需要主动重连其他节点 。
  • 服务端主动向客户端发送(服务端)下线通知时,客户端需要主动重连其他节点 。
如何监听到客户端和服务端连接断开 ?netty的io.netty.channel.ChannelInboundHandler接口中给我们提供了许多重要的接口方法 。为了避免实现全部的接口方法,可以通过继承io.netty.channel.ChannelInboundHandlerAdapter来重写相应的方法即可 。
1.void channelInactive(ChannelHandlerContext ctx);在客户端关闭时被调用,表示客户端断开连接 。当有以下几种情况发生时会触发:
  • 客户端在正常active状态下,主动调用channel或者ctx的close方法 。
  • 服务端主动调用channel或者ctx的close方法关闭客户端的连接。
  • 发生java.io.IOException(一般情况下是双方连接断开)或者java.lang.OutOfMemoryError(4.1.52版本中新增)时
2.void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;则是在入栈发生任何异常时被调用 。如果异常是java.io.IOException或者java.lang.OutOfMemoryError(4.1.52版本新增)时,还会触发channelInactive方法,也就是上面channelInactive被触发的第3条情况 。
3.心跳检查也是检查客户端与服务端之间连接状态的必要方式,因为在一些状态下,两端实际上已经断开连接,但客户端无法感知,这时候就需要通过心跳来判断两端的连接状态 。心跳可以是客户端心跳和服务端心跳 。
  • 客户端信跳:即为客户端发送心跳ping信息,服务端回复pong信息 。这样在指定时间内,双方有数据交互则认为是正常连接状态 。
  • 服务端信息:则是服务端向客户端发送ping信息,客户端回复pong信息 。在指定时间内没有收到回复,则认为对方下线 。
netty给我们提供了非常简单的心跳检查方式,只需要在channel的handler链上,添加io.netty.handler.timeout.IdleStateHandler即可实现 。
IdleStateHandler有如下几个重要的参数:
  • readerIdleTimeSeconds, 读超时. 即当在指定的时间间隔内没有从 Channel 读取到数据时, 会触发一个READER_IDLE的IdleStateEvent 事件.
  • writerIdleTimeSeconds, 写超时. 即当在指定的时间间隔内没有数据写入到 Channel 时, 会触发一个WRITER_IDLE的IdleStateEvent 事件.
  • allIdleTimeSeconds, 读/写超时. 当在指定的时间间隔内没有读或写操作时, 会触发一个ALL_IDLE的IdleStateEvent 事件.
为了能够监听到这些事件的触发,还需要重写ChannelInboundHandler#userEventTriggered(ChannelHandlerContext ctx, Object evt)方法,通过参数evt判断事件类型 。在指定的时间内如果没有读写则发送一条心跳的ping请求,在指定时间内没有收到读操作则任务已经和服务端断开连接 。则调用channel或者ctx的close方法,使客户端Handler执行channelInactive方法 。
到这里看来我们只要在channelInactive和exceptionCaught两个方法中实现自己的重连逻辑即可,但是笔者遇到了第一个坑,重连方法执行了两次 。
先看示例代码和结果,在com.bruce.netty.rpc.client.SimpleClientHandler中添加如下代码:
public class SimpleClientHandler extends ChannelInboundHandlerAdapter {private static final InternalLogger log = InternalLoggerFactory.getInstance(SimpleClientHandler.class);//省略部分代码....../** 客户端正常下线时执行该方法 */@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {log.warn("channelInactive:{}", ctx.channel().localAddress());reconnection(ctx);}/** 入栈发生异常时执行exceptionCaught */@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {if (cause instanceof IOException) {log.warn("exceptionCaught:客户端[{}]和远程断开连接", ctx.channel().localAddress());} else {log.error(cause);}reconnection(ctx);}private void reconnection(ChannelHandlerContext ctx) {log.info("5s之后重新建立连接");//暂时为空实现}}ClientHandlerInitializer 中添加io.netty.handler.timeout.IdleStateHandler用于心跳检查,ClientHeartbeatHandler用于监听心跳事件,接收心跳pong回复 。
static class ClientHandlerInitializer extends ChannelInitializer<SocketChannel> {private static final InternalLogger log = InternalLoggerFactory.getInstance(NettyClient.class);private NettyClient client;public ClientHandlerInitializer(NettyClient client) {this.client = client;}@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(MarshallingCodeFactory.buildMarshallingDecoder());pipeline.addLast(MarshallingCodeFactory.buildMarshallingEncoder());//25s内没有read操作则触发READER_IDLE事件//10s内既没有read又没有write操作则触发ALL_IDLE事件pipeline.addLast(new IdleStateHandler(25, 0, 10));pipeline.addLast(new ClientHeartbeatHandler());pipeline.addLast(new SimpleClientHandler(client));}}


推荐阅读