Netty客户端断线重连实现及问题思考

前言在实现TCP长连接功能中,客户端断线重连是一个很常见的问题,当我们使用netty实现断线重连时,是否考虑过如下几个问题:

  • 如何监听到客户端和服务端连接断开 ?
  • 如何实现断线后重新连接 ?
  • netty客户端线程给多大比较合理 ?
其实上面都是笔者在做断线重连时所遇到的问题,而 “netty客户端线程给多大比较合理?” 这个问题更是笔者在做断线重连时因一个异常引发的思考 。下面讲讲整个过程:
因为本节讲解内容主要涉及在客户端,但是为了读者能够运行整个程序,所以这里先给出服务端及公共的依赖和实体类 。
服务端及common代码maven依赖:
<dependencies><!--只是用到了spring-boot的日志框架--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId><version>2.4.1</version></dependency><dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.56.Final</version></dependency><dependency><groupId>org.jboss.marshalling</groupId><artifactId>jboss-marshalling-serial</artifactId><version>2.0.10.Final</version></dependency></dependencies>服务端业务处理代码主要用于记录打印当前客户端连接数,当接收到客户端信息后返回“hello netty”字符串
@ChannelHandler.Sharablepublic class SimpleServerHandler extends ChannelInboundHandlerAdapter {private static final InternalLogger log = InternalLoggerFactory.getInstance(SimpleServerHandler.class);public static final ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {channels.add(ctx.channel());log.info("客户端连接成功: client address :{}", ctx.channel().remoteAddress());log.info("当前共有{}个客户端连接", channels.size());}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.info("server channelRead:{}", msg);ctx.channel().writeAndFlush("hello netty");}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {log.info("channelInactive: client close");}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {if (cause instanceof JAVA.io.IOException) {log.warn("exceptionCaught: client close");} else {cause.printStackTrace();}}}服务端心跳检查代码当接收心跳"ping"信息后,返回客户端’'pong"信息 。如果客户端在指定时间内没有发送任何信息则关闭客户端 。
public class ServerHeartbeatHandler extends ChannelInboundHandlerAdapter {private static final InternalLogger log = InternalLoggerFactory.getInstance(ServerHeartbeatHandler.class);@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.info("server channelRead:{}", msg);if (msg.equals("ping")) {ctx.channel().writeAndFlush("pong");} else {//由下一个handler处理,示例中则为SimpleServerHandlerctx.fireChannelRead(msg);}}@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {if (evt instanceof IdleStateEvent) {//该事件需要配合 io.netty.handler.timeout.IdleStateHandler使用IdleStateEvent idleStateEvent = (IdleStateEvent) evt;if (idleStateEvent.state() == IdleState.READER_IDLE) {//超过指定时间没有读事件,关闭连接log.info("超过心跳时间,关闭和服务端的连接:{}", ctx.channel().remoteAddress());//ctx.channel().close();}} else {super.userEventTriggered(ctx, evt);}}}编解码工具类主要使用jboss-marshalling-serial编解码工具,可自行查询其优缺点,这里只是示例使用 。
public final class MarshallingCodeFactory {/** 创建Jboss marshalling 解码器 */public static MarshallingDecoder buildMarshallingDecoder() {//参数serial表示创建的是Java序列化工厂对象,由jboss-marshalling-serial提供MarshallerFactory factory = Marshalling.getProvidedMarshallerFactory("serial");MarshallingConfiguration configuration = new MarshallingConfiguration();configuration.setVersion(5);DefaultUnmarshallerProvider provider = new DefaultUnmarshallerProvider(factory, configuration);return new MarshallingDecoder(provider, 1024);}/** 创建Jboss marshalling 编码器 */public static MarshallingEncoder buildMarshallingEncoder() {MarshallerFactory factory = Marshalling.getProvidedMarshallerFactory("serial");MarshallingConfiguration configuration = new MarshallingConfiguration();configuration.setVersion(5);DefaultMarshallerProvider provider = new DefaultMarshallerProvider(factory, configuration);return new MarshallingEncoder(provider);}}


推荐阅读