1. 前言
对于Flink中各个组件(JobMaster、TaskManager、Dispatcher等),其底层RPC框架基于Akka实现,本文着重分析Flink中的Rpc框架实现机制及梳理其通信流程 。
2. Akka介绍
由于Flink底层Rpc是基于Akka实现,我们先了解下Akka的基本使用 。
Akka是一个开发并发、容错和可伸缩应用的框架 。它是Actor Model的一个实现,和Erlang的并发模型很像 。在Actor模型中,所有的实体被认为是独立的actors 。actors和其他actors通过发送异步消息通信 。Actor模型的强大来自于异步 。它也可以显式等待响应,这使得可以执行同步操作 。但是,强烈不建议同步消息,因为它们限制了系统的伸缩性 。每个actor有一个邮箱(mailbox),它收到的消息存储在里面 。另外,每一个actor维护自身单独的状态 。一个Actors网络如下所示:
文章插图
每个actor是一个单一的线程,它不断地从其邮箱中poll(拉取)消息,并且连续不断地处理 。对于已经处理过的消息的结果,actor可以改变它自身的内部状态或者发送一个新消息或者孵化一个新的actor 。尽管单个的actor是自然有序的,但一个包含若干个actor的系统却是高度并发的并且极具扩展性的 。因为那些处理线程是所有actor之间共享的 。这也是我们为什么不该在actor线程里调用可能导致阻塞的“调用” 。因为这样的调用可能会阻塞该线程使得他们无法替其他actor处理消息 。
2.1. 创建Akka系统
Akka系统的核心ActorSystem和Actor,若需构建一个Akka系统,首先需要创建ActorSystem,创建完ActorSystem后,可通过其创建Actor(注意:Akka不允许直接new一个Actor,只能通过 Akka 提供的某些 API 才能创建或查找 Actor,一般会通过 ActorSystem#actorOf和ActorContext#actorOf来创建 Actor),另外,我们只能通过ActorRef(Actor的引用,其对原生的 Actor 实例做了良好的封装,外界不能随意修改其内部状态)来与Actor进行通信 。如下代码展示了如何配置一个Akka系统 。
// 1. 构建ActorSystem// 使用缺省配置ActorSystem system = ActorSystem.create("sys");// 也可显示指定Appsys配置// ActorSystem system1 = ActorSystem.create("helloakka", ConfigFactory.load("appsys"));// 2. 构建Actor,获取该Actor的引用,即ActorRefActorRef helloActor = system.actorOf(Props.create(HelloActor.class), "helloActor");// 3. 给helloActor发送消息helloActor.tell("hello helloActor", ActorRef.noSender());// 4. 关闭ActorSystemsystem.terminate();在Akka中,创建的每个Actor都有自己的路径,该路径遵循 ActorSystem 的层级结构,大致如下:
本地:akka://sys/user/helloActor远程:akka.tcp://sys@l27.0.0.1:2020/user/remoteActor其中本地路径含义如下:
- sys,创建的ActorSystem的名字;
- user,通过ActorSystem#actorOf和ActorContext#actorOf 方法创建的 Actor 都属于/user下,与/user对应的是/system,其是系统层面创建的,与系统整体行为有关,在开发阶段并不需要对其过多关注
- helloActor,我们创建的HelloActor
- akka.tcp,远程通信方式为tcp;
- sys@127.0.0.1:2020,ActorSystem名字及远程主机ip和端口号 。
2.2. 根据path获取Actor
若提供了Actor的路径,可以通过路径获取到ActorRef,然后与之通信,代码如下所示:
ActorSystem system = ActorSystem.create("sys");ActorSelection as = system.actorSelection("/path/to/actor");Timeout timeout = new Timeout(Duration.create(2, "seconds"));Future<ActorRef> fu = as.resolveOne(timeout);fu.onSuccess(new OnSuccess<ActorRef>() { @Overridepublic void onSuccess(ActorRef actor) { System.out.println("actor:" + actor); actor.tell("hello actor", ActorRef.noSender()); }}, system.dispatcher());fu.onFailure(new OnFailure() { @Overridepublic void onFailure(Throwable failure) { System.out.println("failure:" + failure); }}, system.dispatcher());由上面可知,若需要与远端Actor通信,路径中必须提供ip:port 。
2.3. 与Actor通信
2.3.1. tell方式
当使用tell方式时,表示仅仅使用异步方式给某个Actor发送消息,无需等待Actor的响应结果,并且也不会阻塞后续代码的运行,如:
helloActor.tell("hello helloActor", ActorRef.noSender());
其中:第一个参数为消息,它可以是任何可序列化的数据或对象,第二个参数表示发送者,通常来讲是另外一个 Actor 的引用,ActorRef.noSender()表示无发送者((实际上是一个 叫做deadLetters的Actor) 。
推荐阅读
- 总结Java中return语句的用法
- 梦见狐狸扑向自己还想咬是什么意思 梦见狐狸扑向自己吓醒
- 2022天猫圣诞活动大还是元旦活动大,圣诞和元旦哪个打折多
- 编发|刘欢夫妇走机场,妻子扎马尾辫戴头巾真年轻,比他还有艺术家气质
- 电动晾衣架好还是手摇晾衣架好 晾衣架电动的好还是手摇的好
- 一道简单面试题引出的Java数据类型连环问
- 梦见牙要掉还没掉下来 梦见自己的牙要掉了还连着
- 梦见自己的左腿断了 梦到自己右腿断了还能走
- 梦到发现了古墓 梦见自己发现了古墓,还有干尸
- 分析MySQL应用架构发展演变史