Akka in Schedulerx2.0

分享 未结
1 1 0 6
小编 2019-08-05发布
收藏 点赞
来源: 阿里分布式任务调度

1. 前言

Schedulerx2.0是阿里中间件自研的基于akka架构的新一代分布式任务调度平台,提供定时、任务编排、分布式跑批等功能,具有高可靠、海量任务、秒级调度等能力。

本篇文章以Schedulerx2.0为例子,介绍akka的应用场景,希望能给同样从事分布式系统开发的同学一些启发。这里不详细介绍akka,初学者可以直接阅读官方文档(https://doc.akka.io/docs/akka/current/index.html?language=java)。

2. Reactive

说到近几年火热的反应式编程,谁都能说几句“异步、并发、非阻塞、高性能”等等,说到有代表性的项目,大家都知道RxJava、Akka、Reactor。

Why Reactive?

——因为Schedulerx2.0作为任务调度平台,支持海量任务调度,提供任务状态机感知任务状态变化,需要Reactive的特性。

Why Akka?

——首先akka很简单,每个actor只需要实现一个onReceive方法。其次,Akka真的非常强大!我们可以看下官方文档(https://doc.akka.io/docs/akka/current/index.html?language=java),Akka几乎提供了一整套解决方案,使用akka可以很方便的实现一套高可靠、高并发、高性能的分布式系统。Schedulerx2.0也只用到了akka生态圈里的一小部分功能:

  • akka-actor
  • akka-eventbus:实现高性能工作流引擎
  • akka-remoting:实现进程间通信
  • akka-persistence:实现消息的At-Least-Once Delivery

3. Akka-actor in Schedulerx2.0

Schedulerx2.0支持百万级别任务,一天上亿次调度,从架构上来说,主要是

server无状态,可水平扩展

基于akka-actor模型,单机性能高

Schedulerx2.0提供任务状态机,如下图

当有海量任务汇报任务状态,单线程肯定是处理不过来的。如果用线程池又会遇到并发问题,比如当前按顺序收到如下消息:

msg1: Instance=100 running

msg2: Instance=101 running

msg3: Instance=102 failed

msg4: Instance=101 success

msg5: Instance=100 failed

有可能instance=100先变成failed,最后变成running,导致状态机错误。

通过Akka-actor架构的模型,可以很容易处理这种场景:

如上图所示,JobInstanceRoutingActor作为路由actor,用来转发消息。下面挂载了很多jobInstanceActor,用来真实处理消息。

所有instance状态的消息都发给JobInstanceRoutingActor,路由actor会把同一个instanceId的消息发给同一个jobInstanceActor,akka能保证一个actor按照消息接收的顺序来处理消息,以此又能保证整个状态机消息的顺序性。

Schedulerx2.0中,大量采用了上面这种模型,来支撑job/workflow/instance等消息的传递。

4. 基于Akka-eventbus的Pub-Sub模式

在异步处理场景中,当然少不了Pub-Sub模式。相信很多人都用过guava的eventbus,可以很简单很优雅的实现一套基于事件驱动的解决方案。通过@Subscribe注解就能注册要订阅的事件,通过@AllowConcurrentEvents注解还能设置并发消费事件。但是guava-eventbus在实现并发消费事件的时候非常暴力,公用一个线程池。这在Schedulerx2.0的应用场景中不太合适,比如某个job触发频率特别高,可能整个线程池都被他占满了,造成其他job饿死。

在项目中大量使用actor模型之后,如果使用原生的actor通信会发现很困难,因为得知道actor的地址才能和他通信。如果有些actor要给多个actor发送消息,你的项目就会变成一个网状的结构,新增一个actor经常会漏掉一些通信。这个时候我们就会想到Pub-Sub模式,所有actor通信只需要给事件总线发送消息,每个actor只需要订阅自己的事件就好了。

如上图所示,定时调度器、工作流引擎、任务状态机等大部分模块,都由akka-eventbus进行管理,每个模块都是第四节定义的路由actor+业务actor的模型。通过该模型,相同的job交给同一个actor处理,不会堵塞其他actor,同样解决了上文提到的guava-eventbus公用线程池的问题。实现类图如下:

5. 两行代码实现进程间通信

Schedulerx2.0是Server-Worker的架构,server和worker,worker和worker都需要进行通信,使用akka-remoting可以很容易实现任意2个进程之间的通信。

Akka-remoting是peer-to-peer的通信方式,每个节点都会暴露一个远程地址,其他节点只要知道地址,就能进行远程通信。Akka-remoting也抽象成一个actor,会让你的程序保持高度的一致,只不过这个actor的地址是远程的地址而已。Akka-remoting支持多种协议,使用起来非常简单,以netty-tcp为例,首先我们在server端定义一个配置文件akka-server.conf

akka {
  actor {
    provider = "akka.remote.RemoteActorRefProvider"
  }
  remote {
    enabled-transports = ["akka.remote.netty.tcp"]
    netty.tcp {
        port = 52014
    }
  }
}

Server只需要2行代码就可以起一个remote actor

ActorSystem actorSystem = ActorSystem.create("server", akkaConfig);
actorSystem.actorOf(HelloActor.props(), "hello");

Worker也只需要2行代码就能实现和server通信

ActorSelection helloSelection = context.actorSelection("akka.tcp://server@xx.xx.xx.xx:52014/user/hello");
helloSelection.tell("hello",getSelf());

对比Schedulerx1.0使用原生netty框架通信需要如下这么多代码

怎么样,使用akka进行远程通信,是不是非常简单和优雅^^

6. 消息At-Least-Once Delivery

Akka默认的消息传递是最多传递一次,即通过tell,如果发送失败,不会重发。At-Least-Once Delivery,提供了一个消息至少传递一次的语义,即保证不丢!这在Schedulerx2.0中很多场景是非常需要的,比如某个实例在worker执行成功了,汇报成功的时候server正好重启了导致汇报失败,会造成工作流下游都卡住没法继续执行。

使用At-Least-Once Delivery要继承UntypedPersistentActorWithAtLeastOnceDelivery(akka-2.4.x)或者AbstractPersistentActorWithAtLeastOnceDelivery(akka-2.5.x)。Akka在2.5.x为了拥抱函数式编程,只支持java8,并用了很多stream的接口,所以接口和2.4.x已经大大不一样了。在Schedulerx2.0中,worker主要是给用户用的,为了兼容低版本的jdk,所以用了2.4.x版本的UntypedPersistentActorWithAtLeastOnceDelivery。

UntypedPersistentActorWithAtLeastOnceDelivery继承UntypedPersistentActor和AtLeastOnceDelivery。

  • UntypedPersistentActor:提供了持久化的actor,对消息持久化、恢复等能力。
  • AtLeastOnceDelivery:主要是deliver、confirmDelivery(long deliveryId)两个接口。

AtLeastOnceDelivery的原理非常简单,worker向server汇报状态的时候,tell改为deliver,deliver会自动生成一个deliveryId,封装进request发送给server,server需要实现把deliveryId封装到response中并返回给worker,worker收到response的时候调用confiremDelivery,会从unconfirmed列表中移除这个deliveryId的request,否则AtLeastOnceDelivery会有一个timer,定期重试这条request。如下图



回帖
  • 消灭零回复