构建 Future,以 RequestId 为 Key,Put 到线程安全的 Map 中 。Get 结果时需要写入 Time Out 超时时间,防止由于结果的未返回而导致的长时间的阻塞 。
SyncFuture<RpcResponse>syncFuture=newSyncFuture<RpcResponse>();SyncFutureCatch.syncFutureMap.put(rpcRequest.getRequestId(),syncFuture);try{RpcResponserpcResponse=syncFuture.get(timeOut,TimeUnit.MILLISECONDS);returnrpcResponse.getResult();}catch(Exceptione){throwe;}finally{SyncFutureCatch.syncFutureMap.remove(rpcRequest.getRequestId());}
结果返回时通过回传的 RequestId 获取对应 Future 写入 Response,Future 线程解除阻塞:
log.debug("TcpClientreceivehead:"+headAnalysis+"TcpClientreceivedata:"+rpcResponse);SyncFuture<RpcResponse>syncFuture=SyncFutureCatch.syncFutureMap.get(rpcResponse.getRequestId());if(syncFuture!=null){syncFuture.setResponse(rpcResponse);}
importjava.util.concurrent.CountDownLatch;importjava.util.concurrent.Future;importjava.util.concurrent.TimeUnit;publicclassSyncFuture<T>implementsFuture<T>{//因为请求和响应是一一对应的,因此初始化CountDownLatch值为1 。privateCountDownLatchlatch=newCountDownLatch(1);//需要响应线程设置的响应结果privateTresponse;//Futrue的请求时间,用于计算Future是否超时privatelongbeginTime=System.currentTimeMillis();publicSyncFuture(){}@Overridepublicbooleancancel(booleanmayInterruptIfRunning){returnfalse;}@OverridepublicbooleanisCancelled(){returnfalse;}@OverridepublicbooleanisDone(){if(response!=null){returntrue;}returnfalse;}//获取响应结果,直到有结果才返回 。@OverridepublicTget()throwsInterruptedException{latch.await();returnthis.response;}//获取响应结果,直到有结果或者超过指定时间就返回 。@OverridepublicTget(longtimeOut,TimeUnitunit)throwsInterruptedException{if(latch.await(timeOut,unit)){returnthis.response;}returnnull;}//用于设置响应结果,并且做countDown操作,通知请求线程publicvoidsetResponse(Tresponse){this.response=response;latch.countDown();}publiclonggetBeginTime(){returnbeginTime;}}
SyncFuture<RpcResponse>syncFuture=newSyncFuture<RpcResponse>();SyncFutureCatch.syncFutureMap.put(rpcRequest.getRequestId(),syncFuture);RpcResponserpcResponse=syncFuture.get(timeOut,TimeUnit.MILLISECONDS);SyncFutureCatch.syncFutureMap.remove(rpcRequest.getRequestId());
除了同步服务调用,异步服务调用,还有并行服务调用,泛化调用等调用形式 。
高可用
简单的介绍了下分布式服务框架,下面来说下分布式系统的高可用 。一个系统设计开发出来,三天两晚就出个大问题,导致无法使用,那这个系统也不是什么好系统 。
业界流传一句话:"我们系统支持 X 个 9 的可靠性" 。这个 X 是代表一个数字,X 个 9 表示在系统 1 年时间的使用过程中,系统可以正常使用时间与总时间(1 年)之比 。
3 个 9:(1-99.9%)*365*24=8.76 小时,表示该系统在连续运行 1 年时间里最多可能的业务中断时间是 8.76 小时,4 个 9 即 52.6 分钟,5 个 9 即 5.26 分钟 。要做到如此高的可靠性,是非常大的挑战 。
一个大型分布式项目可能是由几十上百个项目构成,涉及到的服务成千上万,主链上的一个流程就需要流转多个团队维护的项目 。
拿 4 个 9 的可靠性来说,平摊到每个团队的时间可能不到 10 分钟 。这 10 分钟内需要顶住压力,以最快的时间找到并解决问题,恢复系统的可用 。
下面说说为了提高系统的可靠性都有哪些方案:
服务检测:某台服务器与注册中心的连接中断,其提供的服务也无响应时,系统应该能主动去重启该服务,使其能正常对外提供 。
故障隔离:集群环境下,某台服务器能对外提供服务,但是因为其他原因,请求结果始终异常 。
这时就需要主动将该节点从集群环境中剔除,避免继续对后面的请求造成影响,非高峰时期再尝试修复该问题 。至于机房故障的情况,只能去屏蔽整个机房了 。
目前饿了么做的是异地多活,即便单边机房挂了,流量也可以全量切换至另外一边机房,保证系统的可用 。
监控:包含业务监控、服务异常监控、DB 中间件性能的监控等,系统出现异常的时候能及时的通知到开发人员 。等到线下报上来的时候,可能影响已经很大了 。
压测:产线主链路的压测是必不可少的,单靠集成测试,有些高并发的场景是无法覆盖到的,压测能暴露平常情况无法出现的问题,也能直观的提现系统的吞吐能力 。当业务激增时,可以考虑直接做系统扩容 。
推荐阅读
- 这一次,让你完全理解 HTTPS 到底是如何做到数据传输安全的
- 黑客是如何控制你手机的?出现这几种情况,你的手机可能已中招
- 经常练太极拳可以降低血糖是真的吗?
- 太极拳和阴阳五也行之间的关系是怎样的
- 你必须掌握的太极拳要领
- 太极拳居然有这功效 我震惊了
- 揭秘太极拳实战有什么作用
- 什么是掤劲 掤劲的训练方法
- 静功缠丝太极拳的创始人是谁
- 杨氏太极拳24式分别是什么