这可能是讲分布式系统最到位的一篇文章( 二 )

这可能是讲分布式系统最到位的一篇文章
文章插图
 
子节点 2
 
如下面 Zookeeper 中写入的临时顺序节点信息:

  • com.black.blackrpc.test.Helloword:发布服务时对外的名称 。
  • 00000000010,00000000011:ZK 顺序节点 ID 。
  • 127.0.0.1:8888,127.0.0.1:8889:服务地址端口 。
  • Protostuff:序列化方式 。
  • 1.0:权值,负载均衡策略使用 。
 
这里使用的是 Zookeeper 的临时顺序节点,为什么使用临时顺序节点,主要是考虑以下两点:
  • 当服务提供者异常下线时,与 Zookeeper 的连接会中断,Zookeeper 服务器会主动删除临时节点,同步给服务消费者 。这样就能避免服务消费者去请求异常的服务器 。校稿注: 一般消费方也会在实际发起请求前,对当前获取到的服务提供方节点进行心跳,避免请求连接有问题的节点 。
  • Zookeeper 下面是不允许创建 2 个名称相同的 ZK 子节点的,通过顺序节点就能避免创建相同的名称 。当然也可以不用顺序节点的方式,直接以 com.black.blackrpc.test.HelloWord 创建节点,在该节点下创建数据节点 。
 
下面是 ZK 的数据同步过程:
/***同步节点(通知模式)*syncNodes会通过级联方式,在每次watcher被触发后,就会再挂上新的watcher 。完成了类似链式触发的功能*/publicbooleansyncNodes(){try{List<String>nodeList=zk.getChildren(ZkConstant.ZK_RPC_DATA_PATH,newWatcher(){@Overridepublicvoidprocess(WatchedEventevent){if(event.getType()==Event.EventType.NodeChildrenChanged){syncNodes();}}});Map<String,List<String>>map=newHashMap<String,List<String>>();for(Stringnode:nodeList){byte[]bytes=zk.getData(ZkConstant.ZK_RPC_DATA_PATH+"/"+node,false,null);Stringkey=node.substring(0,node.lastIndexOf(ZkConstant.DELIMITED_MARKER));Stringvalue=newString(bytes);Objectobject=map.get(key);if(object!=null){((List<String>)object).add(value);}else{List<String>dataList=newArrayList<String>();dataList.add(value);map.put(key,dataList);}log.info("node:[{}]data:[{}]",node,newString(bytes));}/**修改连接的地址缓存*/if(MapUtil.isNotEmpty(map)){log.debug("invokingservicecacheupdateing....");InvokingServiceCache.updataInvokingServiceMap(map);}returntrue;}catch(KeeperException|InterruptedExceptione){log.error(e.toString());returnfalse;}} 
当数据同步到本地时,一般会写入到本地文件中,防止因 Zookeeper 集群异常下线而无法获取服务提供者信息 。
 
通讯与协议
 
服务消费者无论是与注册中心还是与服务提供者,都需要存在网络连接传输数据,而这就涉及到通讯 。
 
笔者之前也做过这方面的工作,当时使用的是 JAVA BIO 简单的写了一个通讯包,使用场景没有多大的并发,阻塞式的 BIO 也未暴露太多问题 。
 
java BIO 因其建立连接之后会阻塞线程等待数据,这种方式必须以一连接一线程的方式,即客户端有连接请求时服务器端就需要启动一个线程进行处理 。当连接数过大时,会建立相当多的线程,性能直线下降 。
 
Java NIO:同步非阻塞,服务器实现模式为一个请求一个线程,即客户端发送的连接请求都会注册到多路复用器上,多路复用器轮询到连接有 I/O 请求时才启动一个线程进行处理 。
 
Java AIO:异步非阻塞,服务器实现模式为一个有效请求一个线程,客户端的 I/O 请求都是由 OS 先完成了再通知服务器应用去启动线程进行处理 。
 
BIO、NIO、AIO 适用场景分析:
  • BIO:用于连接数目比较小且固定的架构,这种方式对服务器资源要求比较高,并发局限于应用中,但程序直观简单易理解 。
  • NIO:适用于连接数目多且连接比较短(轻操作)的架构,比如聊天服务器,并发局限于应用中,编程比较复杂 。目前主流的通讯框架 Netty、Apache Mina、Grizzl、NIO Framework 都是基于其实现的 。
  • AIO:用于连接数目多且连接比较长(重操作)的架构,比如图片服务器,文件传输等,充分调用 OS 参与并发操作,编程比较复杂 。
 
作为基石的通讯,其实要考虑很多东西 。如:丢包粘包的情况,心跳机制,断连重连,消息缓存重发,资源的优雅释放,长连接还是短连接等 。
 
下面是 Netty 建立服务端,客户端的简单实现:
importio.netty.bootstrap.ServerBootstrap;importio.netty.channel.ChannelInitializer;importio.netty.channel.ChannelPipeline;importio.netty.channel.EventLoopGroup;importio.netty.channel.nio.NioEventLoopGroup;importio.netty.channel.socket.SocketChannel;importio.netty.channel.socket.nio.NIOServerSocketChannel;importio.netty.handler.codec.LengthFieldBasedFrameDecoder;importio.netty.handler.codec.LengthFieldPrepender;importio.netty.handler.codec.bytes.ByteArrayDecoder;importio.netty.handler.codec.bytes.ByteArrayEncoder;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;/****nettytcp服务端*@authorv_wangshiyu**/publicclassNettyTcpService{privatestaticfinalLoggerlog=LoggerFactory.getLogger(NettyTcpService.class);privateStringhost;privateintport;publicNettyTcpService(Stringaddress)throwsException{Stringstr[]=address.split(":");this.host=str[0];this.port=Integer.valueOf(str[1]);}publicNettyTcpService(Stringhost,intport)throwsException{this.host=host;this.port=port;}/**用于分配处理业务线程的线程组个数*/privatestaticfinalintBIZGROUPSIZE=Runtime.getRuntime().availableProcessors()*2;//默认/**业务出现线程大小*/privatestaticfinalintBIZTHREADSIZE=4;/**NioEventLoopGroup实际上就是个线程,*NioEventLoopGroup在后台启动了n个NioEventLoop来处理Channel事件,*每一个NioEventLoop负责处理m个Channel,*NioEventLoopGroup从NioEventLoop数组里挨个取出NioEventLoop来处理Channel*/privatestaticfinalEventLoopGroupbossGroup=newNioEventLoopGroup(BIZGROUPSIZE);privatestaticfinalEventLoopGroupworkerGroup=newNioEventLoopGroup(BIZTHREADSIZE);publicvoidstart()throwsException{log.info("NettyTcpServiceRun...");ServerBootstrapb=newServerBootstrap();b.group(bossGroup,workerGroup);b.channel(NioServerSocketChannel.class);b.childHandler(newChannelInitializer<SocketChannel>(){@OverridepublicvoidinitChannel(SocketChannelch)throwsException{ChannelPipelinepipeline=ch.pipeline();pipeline.addLast("frameDecoder",newLengthFieldBasedFrameDecoder(Integer.MAX_VALUE,0,4,0,4));pipeline.addLast("frameEncoder",newLengthFieldPrepender(4));pipeline.addLast("decoder",newByteArrayDecoder());pipeline.addLast("encoder",newByteArrayEncoder());//pipeline.addLast(newEncoder());//pipeline.addLast(newDecoder());pipeline.addLast(newTcpServerHandler());}});b.bind(host,port).sync();log.info("NettyTcpServiceSuccess!");}/***停止服务并释放资源*/publicvoidshutdown(){workerGroup.shutdownGracefully();bossGroup.shutdownGracefully();}}importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importio.netty.channel.ChannelHandlerContext;importio.netty.channel.SimpleChannelInboundHandler;/***服务端处理器*/publicclassTcpServerHandlerextendsSimpleChannelInboundHandler<Object>{privatestaticfinalLoggerlog=LoggerFactory.getLogger(TcpServerHandler.class);@OverrideprotectedvoidchannelRead0(ChannelHandlerContextctx,Objectmsg)throwsException{byte[]data=https://www.isolves.com/it/cxkf/jiagou/2020-02-22/(byte[])msg;}}


推荐阅读