Akka Streams: GraphStage

分享 未结
0 1 0 8
小编 2017-08-18发布
收藏 点赞
转自:http://www.moye.me/2016/09/04/akka-streams_graphstage/

Stages

Stage 是 Akka Streams 里的核心概念,它代表了在Graph里的单位,是如何定义输入端口(Inlet) / 输出端口(Outlet) 的;常用的基础形态有:

compose_shapes1

  • Source: 只有一个Outlet
  • Sink: 只有一个Inlet
  • Flow: 只有一个Outlet和一个Inlet
  • Fan-In: 多个Inlet一个Outlet
  • Fan-Out: 一个Inlet多个Outlet
  • BidiFlow: 双向通道,一对Inlet一对Outlet

有了这些基础件,我们可以按需组合出各种拓扑的图来:

runnable_graph1

自定义流处理

之前介绍过,Akka Streams 内置支持 back-pressure 特性:可根据上下游 Stage 的处理能力,进行推/拉 速率的流控。同时,Akka Stream 也允许我们通过自定义 GraphStage 进行流处理细节的控制。

GraphStage 的定义如下:

A GraphStage represents a reusable graph stream processing stage. A GraphStage consists of a Shape which describes its input and output ports and a factory function that creates a GraphStageLogic which implements the processing logic that ties the ports together.


可以看出,它需要一个Shape来描述 输入(Inlet) 和 输出(Outlet) 端口,以及一个 GraphStageLogic 来实现串联端口的逻辑(数据是如何在端口间流动)

Shape 即 Stage 的表现形式,常用的有:

  • SourceShape: 只有一个Outlet
  • SinkShape: 只有一个Inlet
  • FlowShape: 只有一个Outlet和一个Inlet
  • ……

GraphStageLogic 是用于管理端口事件和回调的状态机,通常来说,在不同的Shape里,它根据Inlet和Outlet 的数量不同,按需提供 流输入事件的回调 InHandler 和 流输出事件的回调 OutHandler:

OutHandler

输出端口可用的操作:

  • push(out,elem) 发送数据到输出端口(当下游进行拉取时可用)
  • complete(out) 关闭输出端口
  • fail(out,exception) 关闭输出端口,并附上错误信息

在 GraphStageLogic 内使用 setHandler(out,handler)为输出端口注册 OutHandler 实例,可处理如下回调:

  • onPull() 在输出端口准备好发送下一条数据时被调用,push(out,elem) 此时将可被用于此端口进行数据的推送
  • onDownstreamFinish() 在下游处于被取消状态或不再接收消息时被调用。此后将不会再有 onPull() 事件发生。此回调如未被重载,当前Stage将在此时完成

输出端口可用的状态查询方法:

  • isAvailable(out) 端口是否允许发送数据
  • isClosed(out) 端口是否已关闭(此时不能再推送和拉取)

OutHandler 状态机如图:

outport_transitions1


InHandler

输入端口可用的操作:

  • pull(in) 向输入端口请求一条新的数据(当上游推送数据时可用)
  • grab(in) 在 onPush()回调里,获取接收到的数据的内容(不能被重复调用,除非上游再次发送新的数据)
  • cancel(in) 关闭输入端口

在 GraphStageLogic 内使用 setHandler(in,handler)为输出端口注册 InHandler 实例,可处理如下回调:

  • onPush() 在输入端口有一条新数据到达时被调用。此时可使用grab(in) 获取数据内容,使用 pull(in) 请求下一条数据(流控在此实现)。可以不使用grab(in) 获取 而直接 pull(in) 请求新数据,此时旧的数据将从缓冲区被抹掉
  • onUpstreamFinish() 在上游关闭或再无数据可获取时被调用。此后将不再有 onPush() 事件发生。此回调如未被重载,当前Stage将在此时完成
  • onUpstreamFailure() 在上游出错时被调用,回调将带着一个异常参数。此后不再有 onPush() 事件发生。此回调如未被重载,当前Stage将在此时失败

输入端口可用的状态查询方法:

  • isAvailable(in) 端口是否有数据可获取
  • hasBeenPulled(in) 端口是否已经pull过了,如果为true, 再调用 pull(in) 将会出错
  • isClosed(in) 端口是否已关闭

InHandler 状态机如图:

inport_transitions1


GraphStage实例

流过滤器Filter

基于GraphStage,我们可以为FlowShpe通道提供筛选函数,实现一个数据流筛选器:graph_stage_filter1

实现:

Scala

import akka.actor.ActorSystem
import akka.stream._
import akka.stream.scaladsl.{Sink, Source}
import akka.NotUsed
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
 
object FilterStage_ extends App {
  // 过滤器实现
  class Filter[A, B](f: A => Boolean) extends GraphStage[FlowShape[A, B]] {
    val in = Inlet[A]("Filter.in")
    val out = Outlet[B]("Filter.out")
 
    override val shape = FlowShape.of(in, out)
 
    override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
      new GraphStageLogic(shape) {
        setHandler(in, new InHandler {
          @scala.throws[Exception](classOf[Exception])
          override def onPush(): Unit = {
            val elem = grab(in)
            if(f(elem)) {
              push(out, elem.asInstanceOf[B])
            }
          }
        })
 
        setHandler(out, new OutHandler {
          @scala.throws[Exception](classOf[Exception])
          override def onPull(): Unit = {
            pull(in)
          }
        })
      }
  }
 
  // 调用部分
  implicit val system = ActorSystem()
  implicit val ec = system.dispatcher
  implicit val materializer = ActorMaterializer()
 
  val flowGraph: Graph[FlowShape[Int, Int], NotUsed] =
    new Filter[Int, Int]((a:Int) => a < 100)
  val result = Source.fromIterator(() => Iterator from 5)
    .via(flowGraph)
    .to(Sink.foreach(println))
  result.run()
  Thread.sleep(300)
 
  system.terminate()
}

输出:

5
6
...
98
99

定时器流控阀

TimerGraphStageLogi 继承自 GraphStageLogic,它提供了定时器回调接口:

  • scheduleOnce(key,delay) 下一次定时回调
  • schedulePeriodically(key,period) 周期性的定时回调
  • schedulePeriodicallyWithInitialDelay(key,delay,period) 带初始值的周期性定时回调

定时器会触发onTimer(key)回调,在其中可置入定时处理逻辑

通过使用这些接口,我们可以轻松实现如 RX.Debounce 一般的流控功能。

实例:迭代50000个数,1毫秒一次进行分组,显示分组的长度

Scala

import akka.actor.ActorSystem
import akka.stream._
import akka.stream.scaladsl.{Sink, Source}
import akka.stream.stage._
import scala.collection.mutable.ListBuffer
import scala.concurrent.Await
import scala.concurrent.duration._
 
object TimedGate_ extends App {
  // 定时器流控
  class TimedGate[A](silencePeriod: FiniteDuration)
    extends GraphStage[FlowShape[A, List[A]]] {
 
    val in = Inlet[A]("TimedGate.In")
    val out = Outlet[List[A]]("TimedGate.Out")
 
    override val shape = FlowShape.of(in, out)
 
    override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
      new TimerGraphStageLogic(shape) {
        var open = false
        var buffer = ListBuffer[A]()
 
        setHandler(in, new InHandler {
          @scala.throws[Exception](classOf[Exception])
          override def onPush(): Unit = {
            val elem = grab(in)
            buffer += elem
            if (open) {
              pull(in)
            }
            else {
              push(out, buffer.toList)
              buffer = ListBuffer[A]()
              open = true
              scheduleOnce(None, silencePeriod)
            }
          }
 
          override def onUpstreamFinish(): Unit = {
            if (buffer.nonEmpty) {
              emit(out, buffer.toList)
            }
            complete(out)
          }
        })
 
        setHandler(out, new OutHandler {
          @scala.throws[Exception](classOf[Exception])
          override def onPull(): Unit = {
            pull(in)
          }
        })
 
        override def onTimer(timerKey: Any): Unit = {
          open = false
        }
      }
  }
 
  // 调用部分
  implicit val system = ActorSystem()
  implicit val ec = system.dispatcher
  implicit val materializer = ActorMaterializer()
 
  val result = Source.fromIterator(() => Iterator from 1)
    .take(50000)
    .via(new TimedGate[Int](1 millis))
    .runWith(Sink.seq)
 
  val seqs = Await.result(result, 10 seconds)
  seqs.foreach(seq => println(seq.length))
 
  system.terminate()
}

结果随机,不列举

回帖
  • 消灭零回复