Netty客户端断线重连实现及问题思考( 五 )


Netty客户端断线重连实现及问题思考

文章插图
 
最后实现代码如下:
public class SimpleClientHandler extends ChannelInboundHandlerAdapter {private static final InternalLogger log = InternalLoggerFactory.getInstance(SimpleClientHandler.class);@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {log.warn("channelInactive:{}", ctx.channel().localAddress());ctx.pipeline().remove(this);ctx.channel().close();reconnection(ctx);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {if (cause instanceof IOException) {log.warn("exceptionCaught:客户端[{}]和远程断开连接", ctx.channel().localAddress());} else {log.error(cause);}ctx.pipeline().remove(this);//ctx.close();ctx.channel().close();reconnection(ctx);}}执行效果如下,可以看到当发生异常时,只是执行了exceptionCaught方法,并且通过channel关闭了上一次连接资源,也没有执行当前handler的fireChannelInactive方法 。
Netty客户端断线重连实现及问题思考

文章插图
 
如何实现断线后重新连接 ?通过上面分析,我们已经知道在什么方法中实现自己的重连逻辑,但是具体该怎么实现呢,怀着好奇的心态搜索了一下各大码友的实现方案 。大多做法是通过ctx.channel().eventLoop().schedule添加一个定时任务调用客户端的连接方法 。笔者也参考该方式实现代码如下:
private void reconnection(ChannelHandlerContext ctx) { log.info("5s之后重新建立连接"); ctx.channel().eventLoop().schedule(new Runnable() {@Overridepublic void run() {boolean connect = client.connect();if (connect) {log.info("重新连接成功");} else {reconnection(ctx);}} }, 5, TimeUnit.SECONDS);}测试:先启动server端,再启动client端,待连接成功之后kill掉 server端进程 。客户端如期定时执行重连,但也就去茶水间倒杯水的时间,回来后发现了如下异常 。
......省略14条相同的重试日志[2021-01-17 18:46:45.032] INFO[nioEventLoopGroup-2-1] [com.bruce.netty.rpc.client.SimpleClientHandler] : 5s之后重新建立连接[2021-01-17 18:46:48.032] INFO[nioEventLoopGroup-2-1] [com.bruce.netty.rpc.client.NettyClient] : 尝试连接到服务端: 127.0.0.1:8088[2021-01-17 18:46:50.038] ERROR[nioEventLoopGroup-2-1] [com.bruce.netty.rpc.client.NettyClient] : 连接异常:Connection refused: no further information: /127.0.0.1:8088[2021-01-17 18:46:50.038] INFO[nioEventLoopGroup-2-1] [com.bruce.netty.rpc.client.SimpleClientHandler] : 5s之后重新建立连接[2021-01-17 18:46:53.040] INFO[nioEventLoopGroup-2-1] [com.bruce.netty.rpc.client.NettyClient] : 尝试连接到服务端: 127.0.0.1:8088[2021-01-17 18:46:53.048] ERROR[nioEventLoopGroup-2-1] [com.bruce.netty.rpc.client.NettyClient] : connect error:io.netty.util.concurrent.BlockingOperationException: DefaultChannelPromise@10122121(incomplete) at io.netty.util.concurrent.DefaultPromise.checkDeadLock(DefaultPromise.java:462) at io.netty.channel.DefaultChannelPromise.checkDeadLock(DefaultChannelPromise.java:159) at io.netty.util.concurrent.DefaultPromise.await0(DefaultPromise.java:667) at io.netty.util.concurrent.DefaultPromise.awaitUninterruptibly(DefaultPromise.java:305) at com.bruce.netty.rpc.client.NettyClient.connect(NettyClient.java:49) at com.bruce.netty.rpc.client.SimpleClientHandler$1.run(SimpleClientHandler.java:65) at io.netty.util.concurrent.PromiseTask.runTask(PromiseTask.java:98) at io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:170) at io.netty.util.concurrent.AbstractEventExecutor.safeExecute$$$capture(AbstractEventExecutor.java:164) at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java) at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)根据异常栈,可以发现是com.bruce.netty.rpc.client.NettyClient#connect方法中调用了等待方法
boolean notTimeout = channelFuture.awaitUninterruptibly(20, TimeUnit.SECONDS);而该方法内部会进行检测,是否在io线程上执行了同步等待,这会导致抛出异常BlockingOperationException 。
@Overrideprotected void checkDeadLock() {if (channel().isRegistered()) {super.checkDeadLock();}}protected void checkDeadLock() {EventExecutor e = executor();if (e != null && e.inEventLoop()) {throw new BlockingOperationException(toString());}}奇怪的是为什么不是每次尝试重连都抛出该异常,而是每隔16次抛出一次呢?
这让我联想到自己的笔记本是8核处理器,而netty默认线程池是2 * c,就是16条线程,这之间似乎有些关联 。
实际上在调用ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8088);,netty首先会创建一个io.netty.channel.Channel(示例中是NioSocketChannel),然后通过io.netty.util.concurrent.EventExecutorChooserFactory.EventExecutorChooser依次选择一个NioEventLoop,将Channel绑定到NioEventLoop上 。


推荐阅读