集群分片

集群分片可以用于,当你想要 Actor 分布到集群的多个节点上,并通过它们的逻辑标识符来与它们交互,而不需要关心它们在集群中的物理位置,而这些物理位置也会随着时间被改变(迁移)。

比如 Actor 可以用于表示 DDD(Domain-Driven Design) 术语中的 聚合根(Aggregate Roots)。这时我们称这些 Actor 为“实体”。这些 Actor 通常拥有(坚固的)持久化状态,但是该特性并不局限于拥有持久化状态的 Actor。

集群分片通常用于有拥有很多有状态的 Actor,它们更适合一起消耗更多的资源而不是在单个机器上。如果你仅仅拥有很少几个有状态的 Actor,则让它们运行在 集群单例 节点上则更为简便。

在这里分片表示带有标识符的 Actor,因此成为实体,可以被自动分发到集群中的多个节点上。每个实体 Actor 仅会在一个地方运行,并且可以在发送者不知道目的 Actor 位置的情况下向其发送消息。这是通过一个由扩展提供的ShardRegion发送消息来实现的,它知道如何通过实体 ID 将消息路由到最终的目的地。

如果开启了对应特性,集群中那些 WeaklyUp 状态的成员将不会激活集群分片。

警告

集群分片不能和 Automatic Downing 一起使用,因为它会将集群拆分成两个单独的集群,从而导致在每个单独的集群中多个分片和实体被启动。查看 Downing

一个例子

实体 Actor 看起来会是这样:

case object Increment
case object Decrement
final case class Get(counterid: Long)
final case class EntityEnvelope(id:Long, payload:Any)

case object Stop
final case class CounterChanged(delta:Int)

class Counter extends PersistentActor {
  import ShardRegion.Passivate

  context.setReceiveTimeout(120.seconds)

  // self.path.name is the entity identifier (utf-8 URL-encoded)
  override def persistenceId:String = "Counter-" + self.name

  var count = 0

  def updateState(event:CounterChanged):Unit = 
    count += event.delta

  override def receiveCommand:Receive = {
    case Increment => persist(CounterChanged(+1))(updateState)
    case Decrement => persist(CounterChanged(-1))(updateState)
    case Get(_) => sender ! count
    case ReceiveTimeout = context.parent ! Passivate(stopMessage = Stop)
    case Stop => context.stop(self)
  }
}

上面的 Actor 使用了PersistentActor来保存状态来实现事件溯源。它并非必须是一个持久化 Actor,但是对于处理失败和节点间的实体迁移,如果它是有价值的则必须能够恢复其状态。

注意persistenceId是如何被定义的。Actor 的名字在这里是标识符(utf-8 URL-encoded)。或者你可以以其他的方式来定义它,但必须是唯一的。

在使用分片扩展之前你首先应该通过ClusterSharding.start方法对需要支持的实体类型进行注册,通常是在集群中每个节点的系统启动的时候。ClusterSharding.start会给你一个可以用来传递的引用。

val counterRegion:ActoRef = ClusterSharding(system).start(
  typeName = "Counter",
  entityProps = Props[Counter],
  settings = ClusterShardingSettings(system),
  extractEntityId = extractEntityId,
  extractShardId = extractShardId
)

extractEntityIdextractShardId是两个由应用程序指定的函数,用于从传入的消息中解析实体标示符和分片标识符。

val extractEntityId:ShardRegin.ExtractEntityId = {
  case EntityEnvelpoe(id, payload) => (id.tostring, payload)
  case msg @ Get(id) => (id.toString, msg)
}

val numberOfShards = 100

val extractShardId: ShardRegion.ExtractShardId = {
  case EntityEnvelope(id, _) => (id % numberOfShards).toString
  case Get(id) => (id % numberOfShards).toString
}

这个例子展示了在消息中定义标示符的两种方式:

  • Get消息本身已经包含了标示符
  • 自定义一个EntityEnvelope专门用于持有标示符,实际要发送的消息则被包装在其中

注意上面的extractEntityId函数是如何处理这两种类型的消息的。extractEntityId会返回一个元组,元组的第二部分则是发送给实体 Actor 的消息,这样一来也可以根据需要将信封解包。

一个分片是一个被一起管理的实体组,而分组则是通过上面展示的extractShardId函数定义的。对于一个特定的实体标识符,其分片标识符总是会相同的。

创建一个好的分片算法其本身就是一个很有意思的挑战。尝试生成一个均匀的分布,比如,使每个分片的实体总数相同。作为一个经验法则,分片的数量应该是已规划的节点最大数量的十倍以上(???)。分片数量小于节点数量将会导致一些节点不会持有任何分片,而太多的分片将会导致分片的管理变的低效,比如,再平衡(rebalance)的开销,因为协调器(coordinator)会涉及到每个分片第一条消息的路由,从而增加延迟。分片算法应该在一个运行中的每个节点上保持一致。可以在集群中的所有节点关闭后进行修改。

有个简单的分片算法可以适用于大多数场景,取实体标识符的哈希绝对值并用分片数量取模(math.abs(id.hashCode()) % shardNum)。为了方便,已经由ShardRegion.HashCodeMessageExtractor提供。

给实体的消息总是通过本地的ShardRegion发送。ShardRegion Actor 作为一个被命名的实体类型的引用,由ClusterSharding.start返回,或者通过ClusterSharding.shardRegion进行检索。如果ShardRegion不知道一个实体的分片位置,则会进行查找。它会将消息指派到正确的节点并按需创建实体 Actor,比如,当指定实体的第一个消息到达时进行创建。

val conterRegion:ActorRef = ClusterSharding(system).shardRegion("Counter")
counterRegion ! Get(123)
expectMsg(0)

counterRegion ! EntityEnvelope(123, Increment)
counterRegion ! Get(123)
expectMsg(1)

一个更全面的例子可以在 Lightbend Activator 教程中找到,名为 Akka Cluster Sharding with Scala!

results matching ""

    No results matching ""