Netty客户端断线重连实现及问题思考( 二 )

公共实体类public class UserInfo implements Serializable {private static final long serialVersionUID = 6271330872494117382L;private String username;private int age;public UserInfo() {}public UserInfo(String username, int age) {this.username = username;this.age = age;}//省略getter/setter/toString}下面开始本文的重点,客户端断线重连以及问题思考 。
客户端实现

  • 刚开始启动时需要进行同步连接,指定连接次数内没用通过则抛出异常,进程退出 。
  • 客户端启动后,开启定时任务,模拟客户端数据发送 。
客户端业务处理handler,接收到数据后,通过日志打印 。
public class SimpleClientHandler extends ChannelInboundHandlerAdapter {private static final InternalLogger log = InternalLoggerFactory.getInstance(SimpleClientHandler.class);private NettyClient client;public SimpleClientHandler(NettyClient client) {this.client = client;}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.info("client receive:{}", msg);}}封装连接方法、断开连接方法、getChannel()返回io.netty.channel.Channel用于向服务端发送数据 。boolean connect()是一个同步连接方法,如果连接成功返回true,连接失败返回false 。
public class NettyClient {private static final InternalLogger log = InternalLoggerFactory.getInstance(NettyClient.class);private EventLoopGroup workerGroup;private Bootstrap bootstrap;private volatile Channel clientChannel;public NettyClient() {this(-1);}public NettyClient(int threads) {workerGroup = threads > 0 ? new NioEventLoopGroup(threads) : new NioEventLoopGroup();bootstrap = new Bootstrap();bootstrap.group(workerGroup).channel(NIOSocketChannel.class).option(ChannelOption.TCP_NODELAY, true).option(ChannelOption.SO_KEEPALIVE, false).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 30000).handler(new ClientHandlerInitializer(this));}public boolean connect() {log.info("尝试连接到服务端: 127.0.0.1:8088");try {ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8088);boolean notTimeout = channelFuture.awaitUninterruptibly(30, TimeUnit.SECONDS);clientChannel = channelFuture.channel();if (notTimeout) {if (clientChannel != null && clientChannel.isActive()) {log.info("netty client started !!! {} connect to server", clientChannel.localAddress());return true;}Throwable cause = channelFuture.cause();if (cause != null) {exceptionHandler(cause);}} else {log.warn("connect remote host[{}] timeout {}s", clientChannel.remoteAddress(), 30);}} catch (Exception e) {exceptionHandler(e);}clientChannel.close();return false;}private void exceptionHandler(Throwable cause) {if (cause instanceof ConnectException) {log.error("连接异常:{}", cause.getMessage());} else if (cause instanceof ClosedChannelException) {log.error("connect error:{}", "client has destroy");} else {log.error("connect error:", cause);}}public void close() {if (clientChannel != null) {clientChannel.close();}if (workerGroup != null) {workerGroup.shutdownGracefully();}}public Channel getChannel() {return clientChannel;}static class ClientHandlerInitializer extends ChannelInitializer<SocketChannel> {private static final InternalLogger log = InternalLoggerFactory.getInstance(NettyClient.class);private NettyClient client;public ClientHandlerInitializer(NettyClient client) {this.client = client;}@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(MarshallingCodeFactory.buildMarshallingDecoder());pipeline.addLast(MarshallingCodeFactory.buildMarshallingEncoder());//pipeline.addLast(new IdleStateHandler(25, 0, 10));//pipeline.addLast(new ClientHeartbeatHandler());pipeline.addLast(new SimpleClientHandler(client));}}}客户端启动类
public class NettyClientMain {private static final InternalLogger log = InternalLoggerFactory.getInstance(NettyClientMain.class);private static final ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor();public static void main(String[] args) {NettyClient nettyClient = new NettyClient();boolean connect = false;//刚启动时尝试连接10次,都无法建立连接则不在尝试//如果想在刚启动后,一直尝试连接,需要放在线程中,异步执行,防止阻塞程序for (int i = 0; i < 10; i++) {connect = nettyClient.connect();if (connect) {break;}//连接不成功,隔5s之后重新尝试连接try {Thread.sleep(5000);} catch (InterruptedException e) {e.printStackTrace();}}if (connect) {log.info("定时发送数据");send(nettyClient);} else {nettyClient.close();log.info("进程退出");}}/** 定时发送数据 */static void send(NettyClient client) {scheduledExecutor.schedule(new SendTask(client,scheduledExecutor), 2, TimeUnit.SECONDS);}}客户端断线重连断线重连需求:


推荐阅读