Akka Distributed Data Deep Dive

分享 未结
1 1 1 4
小编 31天前发布
收藏 点赞
来源: 简单易懂の现代魔法

一致性难题

CAP 定理阐述了在构建分布式系统时候,Consistency(一致性), Availability(可用性)Partition tolerance(分区容错性),这三者只能取二。

计算给我们再多的资源(常常以为可以用空间来换取时间,比如加内存,加物理机,甚至加钱),受制于目前计算机体系(多核CPU),理论上不可能三者都满足。

CAP 理论通常都会表述如下:在一致性、可用性和分区容错性这三个特性中,一个分布式系统只能够选择满足其中两个。分区容错性是我们必须要满足的,它表示如果数据被冗余备份到三个节点,那么如果其中一个节点暂时变得不可用,而另两个节点仍然能够正常运行,那么就认为系统具备分区容错性。

假设我们希望系统具备分区容错性,只在可用性和一致性之间进行妥协。读者可能会问为什么。在接收到一个有超时限制的请求时,如果节点不可用,我们其实就需要在两种方案之间进行选择:要么返回错误(选择一致性),要么继续,即使服务器之间可能会不一致(选择可用性)。等待的时间过长会导致该请求被抛弃,所以时间是一个重要的因素,系统必须要在上面两者中做出决定。

为了达到高可用,就必须满足最终一致性;为了达到最终一致性,就会牺牲可用性,数据插入后并不得到一个真实的返回。这里就出现了一致性问题,即要求强一致性时,系统非常非常慢,因为事件A的操作完成后,才能执行后续操作;若是要求最终一致性,多个节点如果没有同步副本,执行查询并不能得到希望的结果,因为我们并不知道副本什么时候同步完成。

现在认为,只要是最终一致性的就可以接受的范围。并且你会发现,所有力求最终一致性的分布式系统,都应该使用CRDT。


分布式事务常见的有几种解决方案:
  • 两阶段提交(2PC)
  • 补偿事务(TCC)
  • 本地消息表(异步确保)
  • MQ 事务消息
  • Sagas 事务模型
具体依赖于系统的实现逻辑方案来决定。这里撇开分布式事务不谈。

Conflict Free Replicated Data Types - CRDT(无冲突复制数据类型)

CRDT,顾名思义就是一类数据结构。指的是副本可以被同步到各个节点上,副本(replicas)与副本之间更新不需要依赖调停器(coordination)。有两种方式实现CRDT,Operation-based CRDTs 和 State-based CRDTs,两种方式都能实现强最终一致性(strong eventual consistencey)。

根据Shapiro本人的论文2,两种实现都是等价的,都可以相互模拟。

State-based replication: 当一个副本(replica)收到来自客户端的一个Update时,首先更新自己的状态,之后将自身的所有状态发送给另一个副本。间接性地,每个副本都将自己的full state发送给系统内的其它副本。当一个副本收到来自其它副本的状态,提供一个Merge函数,将自身本地的状态和接收到的状态合并。如下所示,如果集合的值的状态可以表示为一个半格(semi-lattice )(半格 —— 一个偏序集,带有连续/最小上界)并且更新是递增的(譬如,状态是一个整数,更新是一个自增操作),并且如果Merge函数计算最小上界,则可以确保副本已经合并了相同的值(接近最小上界)。要让系统尽可能成为一个半格Merge操作必须满足等幂(idempotent)、结合律(associative)、交换律(commutative)。一个副本对象满足这些特征的CRDT,称为CvRDT(convergent replicated data type)

Operation-based replication: 该方式并没有将一个副本的全部状态发送给另一个副本,而是通过广播的形式将Update操作发送给系统的其他副本,并由它们重演更新操作(类似于状态机复制 )。因为是广播操作,如果有两个更新,u_1 和 u_2,作用于副本 i 上,然后副本 i 将这两个更新发送给另外两个副本 r_1 和 r_2 ,这时更新到达副本的顺序是不同的,r_1_可能先收到_u_1u_2 ,r_2 可能是先收到 u_2 然后 u_1 。如何进行合并?不管顺序如何,这些更新是满足交换律的,副本最后得到的状态是相同的。类似于这种广播的形式发送Update到所有副本,这一类对象称之为CmRDT(commutative replicated data type)

CRDT解决了分布式系统的一个有趣而又基本的问题,但是有一些限制。CRDT不实现一致性,只针对部分问题空间的Update操作的交换,而不是所有。因此并不是所有问题都可以转化为CRDT。

关于CRDT的研究, Shapiro论文提出了几点理论3

  • (Convergent Replicated Data Type (CvRDT)). Assuming eventual delivery and termination, any state-based object that satisfies the monotonic semilattice property is SEC.
  • (Commutative Replicated Data Type (CmRDT)). Assuming causal delivery of updates and method termination, any op-based object that satisfies the commutativity property for all concurrent updates, and whose delivery precondition is satisfied by causal delivery, is SEC.
  • (CvRDT emulation). Any SEC op-based object can be emulated by a SEC state-based object of a corresponding interface.

Akka集群分片

作为抛砖引玉,这里先介绍Akka Sharding的实现方式,再来介绍Akka Distributed Data。

Akka Sharding也是一种分布式集群的实现4,集群分片主要应用于需要有大量带有状态的Actor处理大量数据的情形。因为要处理Actor的状态,所以很多例子都要求分片上的Actor都是继承了PersistenceActor。为了避免混淆,我们这里不引入PersistenceActor,单独阐述Cluster Sharding的机制。

由于集群分片需要实现位置透明,即ShardRegion.ExtractEntityId需要通过EntityId知会分片Actor的具体位置,从而向对应的Actor发送消息;当然你也可以不需要知会Actor的位置。

object CounterClusterConfig {

  final case class EntityEnvelope(entityId: String, payload: Any)
  final case class Get(entityId: String)
  val numberOfShards = 100 // 最大分片数

  val extractEntityId: ShardRegion.ExtractEntityId = {
    case EntityEnvelope(entityId, payload) => (entityId, payload)
    case msg@Get(entityId) => (entityId, msg)
  }

  val extractShardId: ShardRegion.ExtractShardId = {
    case EntityEnvelope(entityId, _) => (math.abs(entityId.hashCode) % numberOfShards).toString
    case Get(entityId) => (math.abs(entityId.hashCode) % numberOfShards).toString
    case ShardRegion.StartEntity(entityId) =>
      // StartEntity is used by remembering entities feature
      (math.abs(entityId.hashCode) % numberOfShards).toString
  }
}

object Counter {
  case object Increment
  case object Decrement

  case object Stop
  final case class CounterChanged(delta: Int)
}

class Counter extends Actor with ActorLogging {

  import com.lightbend.akka.np_sharding.Counter._
  import com.lightbend.akka.np_sharding.CounterClusterConfig.Get

  import scala.concurrent.duration._

  context.setReceiveTimeout(120.seconds)

  val entityType: String = getClass.getSimpleName
  val entityId  : String = self.path.name

  override def preStart(): Unit = {
    log.debug("==>1. I was created...")
  }

  override def postStop(): Unit = {
    log.debug("==>4. I was stop...")
  }

  override def receive: Receive = {
    case Get(id) =>
      log.debug(s"==>2. $id established...")
    case it@Increment =>
      log.debug("==>3. I am increase...")
      context stop self
  }
}

ShardRegion每次收到来自客户端的消息时,如果分片中没有对应ID的Actor,则会先创建该Actor,然后发送消息;若存在该ID的Actor,则消息直接被Actor处理。

val counterRegion: ActorRef = ClusterSharding(system).start(
  typeName = "Counter",
  entityProps = Props[Counter],
  settings = ClusterShardingSettings(system),
  extractEntityId = CounterClusterConfig.extractEntityId,
  extractShardId = CounterClusterConfig.extractShardId)

// scheduler sender message
system.scheduler.schedule(0.second,
  5.seconds,
  new Runnable {
    override def run(): Unit = {
      val id = java.util.UUID.randomUUID.toString
      counterRegion ! Get(id)
      counterRegion ! EntityEnvelope(id, Increment)
    }
  }
)

sys.addShutdownHook {
  Await.result(system.whenTerminated, Duration.Inf)
}

分片Region类似于线程池,既然可以重复利用,为什么不创建一个监督机制。这样每个Actor都是位置透明的。

class CounterSupervisor extends Actor {
  val counter = context.actorOf(Props[Counter], "theCounter")

  override val supervisorStrategy = OneForOneStrategy() {
    case _: IllegalArgumentException     ⇒ SupervisorStrategy.Resume
    case _: ActorInitializationException ⇒ SupervisorStrategy.Stop
    case _: DeathPactException           ⇒ SupervisorStrategy.Stop
    case _: Exception                    ⇒ SupervisorStrategy.Restart
  }

  def receive = {
    case msg ⇒ counter forward msg
  }
}

默认地,Akka Sharding使用的是Distributed Data Mode

akka.cluster.sharding.state-store-mode = ddata

你也可以使用Persistence Mode,

akka.cluster.sharding.state-store-mode = persistence

这个描述的Distributed Data Mode跟接下来的 Akka Distribted Data并不是一回事。关于更多Akka Sharding 的技术细节可以参考官网。

Akka Distributed Data

Akka Distributed Data 是一个用于节点间共享数据的模组。它被设计成Key-Value存储,Value实现了CRDT(Conflict Free Replicated Data Types)。它允许节点的数据可以从其它任意节点更新,二不需要经过Coordinator处理,CRDT的Value总是合并(converge)。

前面已经描述了CRDT的原理和结构,那么理解Akka Distribted Data就可以得心应手了。区别于Akka Sharding,distributed-data中的Replicator是API提供的,所以直接拿来用就可以了。

副本(Replicator),声明如下:

import akka.cluster.ddata._
 
implicit val cluster = Cluster(context.system)
 
val replicator: ActorRef = DistributedData(context.system).replicator

Akka Distributed Data实际上相当于Redis的Key-Value存储,并且一致性都是可调的。由于Key的特殊性,它是Value类型的编码形式。

val Key = ORMapKey.create[String, StoredOrder]("orders")

这里的Key变量以大写开头,允许用match表达式匹配。这是Scala语言规范,

8.1.1 Variable Patterns

[...]

A variable pattern x is a simple identifier which starts with a *lower case* letter. It matches any value, and binds the variable name to that value. [...]

Replicator实际上是一个简单的Actor,我们通过消息协议进行副本的通讯。例如要处理更新操作,让Replicator发送Replicator.Update消息,

Replicator

def storeOrderValidation(id: OrderIdentifier, storedOrder: StoredOrder, request: StoreOrderValidation) = { 
  replicator ! Replicator.Update(
    key = Key, 
    initial = ORMap.empty[String, StoredOrder], 
    writeConsistency = Replicator.WriteMajority(5.seconds), 
    request = Some(request) 
  ) { orders => 
    orders + (id.i.toString -> storedOrder)
  }
}

 副本需要知会Key是什么

 分布式数据初始化值,这里定义了Key对应的Value是空的。Data Type为ORMap[String, StoredOrder]

 写一致性,它可以是WriteLocalWriteTo(number of nodes)WriteMajority以及WriteAll。这里我们使用WriteMajority。表示立即写入N/2 + 1个节点(N是集群中节点的个数)

 可选的。该对象附带更新请求一并传递。

 这里是一个函数,作用于要修改的值。例如这里的数据是添加一个新的StoredOrder到map中。

object Update {
  def apply[A <: ReplicatedData](
    key: Key[A], initial: A, writeConsistency: WriteConsistency,
    request: Option[Any] = None)(modify: A ? A): Update[A]
}

整个过程就是,Update消息携带的Key,通过modify函数,写入ValueORMap中。那么接下来会出现3种情况:

  • 更新成功
  • 更新全部失败
  • 介于两者之间,部分更新成功

如果所有都按计划执行,将返回一个UpdateSuccess消息,表示更新成功。

case Replicator.UpdateSuccess(Key, Some(request: StoreOrderValidation)) =>
  request.replyTo ! OrderValidationStored(request.id, request.order)

或者更新失败,返回得到一个ModifyFailure

最后一种情况,“我们不确定是否成功,但它确实运行了”,这时包含有UpdateTimeoutStoreFailure

UpdateTimeout的出现,取决于事先定义的一致性等级。例如这里定义的是Replicator.WriteMajority(5.seconds),表示主节点在5秒的时间内没有应答。这种情况出现,可能是其它几个节点处理消息非常慢、或者网络延迟等等因素。如何处理这种情况取决于真实的案例。

StoreFailure,表示本地持久化存储出现了问题。这种问题出现之前,需先额外定义是否进行本地的持久化。

一个非常实用的操作是,我们可以监听分布式数据的改变。

replicator ! Replicator.Subscribe(OrderStorage.Key, self)

当发生改变时,会得到一个Replicator.Changed消息:

case change @ Replicator.Changed(OrderStorage.Key) =>
  val allOrders: Map[String, StoredOrder] = change.get(OrderStorage.Key).entries
  // do something with the orders
对于 Update,一致性等级大概有:
  • WriteLocal the value will immediately only be written to the local replica, and later disseminated with gossip
  • WriteTo(n) the value will immediately be written to at least n replicas, including the local replica
  • WriteMajority the value will immediately be written to a majority of replicas, i.e. at least N/2 + 1 replicas, where N is the number of nodes in the cluster (or cluster role group)
  • WriteAll the value will immediately be written to all nodes in the cluster (or all nodes in the cluster role group)
对于 Get,一致性等级有:
  • ReadLocal the value will only be read from the local replica
  • ReadFrom(n) the value will be read and merged from n replicas, including the local replica
  • ReadMajority the value will be read and merged from a majority of replicas, i.e. at least N/2 + 1 replicas, where N is the number of nodes in the cluster (or cluster role group)
  • ReadAll the value will be read and merged from all nodes in the cluster (or all nodes in the cluster role group)

前面说过,Value的实现是一个CvRDT,它的定义为:

/**
 * Interface for implementing a state based convergent
 * replicated data type (CvRDT).
 * [...]
 **/
trait ReplicatedData {
  /**
   * The type of the concrete implementation, e.g. `GSet[A]`.
   * To be specified by subclass.
   */
  type T <: ReplicatedData
 
  /**
   * Monotonic merge function.
   */
  def merge(that: T): T
 
}

该数据结构要求有merge函数,用于合并其它副本传递过来的数据。

object ORMapKey {
  def create[A, B <: ReplicatedData](id: String): Key[ORMap[A, B]] = ORMapKey(id)
}

该CvRDT被定义为Data Type函数的上界,以实现存储功能。

你会发现,merge函数返回一个T ,不是Option[T]也不是Try[T]——它证明了**merge总是正常的**。你会发现,merge函数是一个单调函数,总是向一个方向增长。

Akka api提供了一些基础数据类型,用于实现我们的分布式数据的存储。

对于上面的例子,可以直接操作数据,因为它本身就是个Key-Value,下面使用分布式方案实现基于内存的存储实现。

import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.Props
import akka.cluster.Cluster
import akka.cluster.ddata.DistributedData
import akka.cluster.ddata.LWWMap
import akka.cluster.ddata.LWWMapKey

object ReplicatedCache {

  def props: Props = Props[ReplicatedCache]

  private final case class Request(key: String, replyTo: ActorRef)

  final case class PutInCache(key: String, value: Any)
  final case class GetFromCache(key: String)
  final case class Cached(key: String, value: Option[Any])
  final case class Evict(key: String)
}

class ReplicatedCache extends Actor {
  import akka.cluster.ddata.Replicator._
  import ReplicatedCache._

  val replicator = DistributedData(context.system).replicator
  implicit val cluster = Cluster(context.system)

  def dataKey(entryKey: String): LWWMapKey[String, Any] =
    LWWMapKey("cache-" + math.abs(entryKey.hashCode) % 100)

  def receive = {
    case PutInCache(key, value) =>
      replicator ! Update(dataKey(key), LWWMap(), WriteLocal)(_ + (key -> value))
    case Evict(key) =>
      replicator ! Update(dataKey(key), LWWMap(), WriteLocal)(_ - key)
    case GetFromCache(key) =>
      replicator ! Get(dataKey(key), ReadLocal, Some(Request(key, sender())))
    case g @ GetSuccess(LWWMapKey(_), Some(Request(key, replyTo))) =>
      g.dataValue match {
        case data: LWWMap[_, _] => data.asInstanceOf[LWWMap[String, Any]].get(key) match {
          case Some(value) => replyTo ! Cached(key, Some(value))
          case None        => replyTo ! Cached(key, None)
        }
      }
    case NotFound(_, Some(Request(key, replyTo))) =>
      replyTo ! Cached(key, None)
    case _: UpdateResponse[_] => // ok
  }

}

TDD案例:

import scala.concurrent.duration._
import akka.cluster.Cluster
import akka.cluster.ddata.DistributedData
import akka.cluster.ddata.Replicator.GetReplicaCount
import akka.cluster.ddata.Replicator.ReplicaCount
import akka.remote.testconductor.RoleName
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.testkit._
import com.typesafe.config.ConfigFactory

object ReplicatedCacheSpec extends MultiNodeConfig {
  val node1 = role("node-1")
  val node2 = role("node-2")
  val node3 = role("node-3")

  commonConfig(ConfigFactory.parseString("""
    akka.loglevel = INFO
    akka.actor.provider = "cluster"
    akka.log-dead-letters-during-shutdown = off
    """))

}

class ReplicatedCacheSpecMultiJvmNode1 extends ReplicatedCacheSpec
class ReplicatedCacheSpecMultiJvmNode2 extends ReplicatedCacheSpec
class ReplicatedCacheSpecMultiJvmNode3 extends ReplicatedCacheSpec

class ReplicatedCacheSpec extends MultiNodeSpec(ReplicatedCacheSpec) with STMultiNodeSpec with ImplicitSender {
  import ReplicatedCacheSpec._
  import ReplicatedCache._

  override def initialParticipants = roles.size

  val cluster = Cluster(system)
  val replicatedCache = system.actorOf(ReplicatedCache.props)

  def join(from: RoleName, to: RoleName): Unit = {
    runOn(from) {
      cluster join node(to).address
    }
    enterBarrier(from.name + "-joined")
  }

  "Demo of a replicated cache" must {
    "join cluster" in within(20.seconds) {
      join(node1, node1)
      join(node2, node1)
      join(node3, node1)

      awaitAssert {
        DistributedData(system).replicator ! GetReplicaCount
        expectMsg(ReplicaCount(roles.size))
      }
      enterBarrier("after-1")
    }

    "replicate cached entry" in within(10.seconds) {
      runOn(node1) {
        replicatedCache ! PutInCache("key1", "A")
      }

      awaitAssert {
        val probe = TestProbe()
        replicatedCache.tell(GetFromCache("key1"), probe.ref)
        probe.expectMsg(Cached("key1", Some("A")))
      }

      enterBarrier("after-2")
    }

    "replicate many cached entries" in within(10.seconds) {
      runOn(node1) {
        for (i ← 100 to 200)
          replicatedCache ! PutInCache("key" + i, i)
      }

      awaitAssert {
        val probe = TestProbe()
        for (i ← 100 to 200) {
          replicatedCache.tell(GetFromCache("key" + i), probe.ref)
          probe.expectMsg(Cached("key" + i, Some(i)))
        }
      }

      enterBarrier("after-3")
    }

    "replicate evicted entry" in within(15.seconds) {
      runOn(node1) {
        replicatedCache ! PutInCache("key2", "B")
      }

      awaitAssert {
        val probe = TestProbe()
        replicatedCache.tell(GetFromCache("key2"), probe.ref)
        probe.expectMsg(Cached("key2", Some("B")))
      }
      enterBarrier("key2-replicated")

      runOn(node3) {
        replicatedCache ! Evict("key2")
      }

      awaitAssert {
        val probe = TestProbe()
        replicatedCache.tell(GetFromCache("key2"), probe.ref)
        probe.expectMsg(Cached("key2", None))
      }

      enterBarrier("after-4")
    }

    "replicate updated cached entry" in within(10.seconds) {
      runOn(node2) {
        replicatedCache ! PutInCache("key1", "A2")
        replicatedCache ! PutInCache("key1", "A3")
      }

      awaitAssert {
        val probe = TestProbe()
        replicatedCache.tell(GetFromCache("key1"), probe.ref)
        probe.expectMsg(Cached("key1", Some("A3")))
      }

      enterBarrier("after-5")
    }

  }

}

性能优化

如果没有做性能优化,将会是萦绕我们夜晚的致命罪孽。在生产环境,我们需要做:

  • 自定义的Data Type需要实现序列化
  • 实现delta-CRDTs避免更新发送full state
  • 删除完成的记录

序列化是性能优化的关键,Java自身的序列方案并不是最优的。建议使用kryo或谷歌的protobuf

delta-CRDT主要用于减少full state Update的发送,它表示以一定的顺序传播更新。

因为是冲突自由(Conflict Free)的,在使用诸如ORMap时,如果并发地添加和删除一个条目,添加会获胜。你不能删除一个不存在的条目。这会引入一个问题,例如某个节点删除订单的同时,另外一个节点添加订单,添加订单被执行。原来的订单没有删除,所以需要额外修剪未移除的已经“完成”的条目。

总结,在副本同步操作时,有些方面并不完全正确的:

  • 对于OrderHandler是没有副本机制的——如果某个节点故障,所有当前处理的Update消息将停留在地域边缘,客户端也不会有任何响应。
  • 不要将东西持久化存储!现实中这是相当疯狂的事情!(设想副本在不断地写入、不断地更新同步)
  1. https://medium.com/@istanbul_techie/a-look-at-conflict-free-replicated-data-types-crdt-221a5f629e7e ↩
  2. https://hal.inria.fr/inria-00609399v1 ↩
  3. https://hal.inria.fr/inria-00609399v1/document ↩
  4. https://doc.akka.io/docs/akka/current/cluster-sharding.html ↩



回帖