集群分片
集群分片可以用于,当你想要 Actor 分布到集群的多个节点上,并通过它们的逻辑标识符来与它们交互,而不需要关心它们在集群中的物理位置,而这些物理位置也会随着时间被改变(迁移)。
比如 Actor 可以用于表示 DDD(Domain-Driven Design) 术语中的 聚合根(Aggregate Roots)。这时我们称这些 Actor 为“实体”。这些 Actor 通常拥有(坚固的)持久化状态,但是该特性并不局限于拥有持久化状态的 Actor。
集群分片通常用于有拥有很多有状态的 Actor,它们更适合一起消耗更多的资源而不是在单个机器上。如果你仅仅拥有很少几个有状态的 Actor,则让它们运行在 集群单例 节点上则更为简便。
在这里分片表示带有标识符的 Actor,因此成为实体,可以被自动分发到集群中的多个节点上。每个实体 Actor 仅会在一个地方运行,并且可以在发送者不知道目的 Actor 位置的情况下向其发送消息。这是通过一个由扩展提供的ShardRegion发送消息来实现的,它知道如何通过实体 ID 将消息路由到最终的目的地。
如果开启了对应特性,集群中那些 WeaklyUp 状态的成员将不会激活集群分片。
警告
集群分片不能和 Automatic Downing 一起使用,因为它会将集群拆分成两个单独的集群,从而导致在每个单独的集群中多个分片和实体被启动。查看 Downing。
一个例子
实体 Actor 看起来会是这样:
case object Increment
case object Decrement
final case class Get(counterid: Long)
final case class EntityEnvelope(id:Long, payload:Any)
case object Stop
final case class CounterChanged(delta:Int)
class Counter extends PersistentActor {
import ShardRegion.Passivate
context.setReceiveTimeout(120.seconds)
// self.path.name is the entity identifier (utf-8 URL-encoded)
override def persistenceId:String = "Counter-" + self.name
var count = 0
def updateState(event:CounterChanged):Unit =
count += event.delta
override def receiveCommand:Receive = {
case Increment => persist(CounterChanged(+1))(updateState)
case Decrement => persist(CounterChanged(-1))(updateState)
case Get(_) => sender ! count
case ReceiveTimeout = context.parent ! Passivate(stopMessage = Stop)
case Stop => context.stop(self)
}
}
上面的 Actor 使用了PersistentActor来保存状态来实现事件溯源。它并非必须是一个持久化 Actor,但是对于处理失败和节点间的实体迁移,如果它是有价值的则必须能够恢复其状态。
注意persistenceId是如何被定义的。Actor 的名字在这里是标识符(utf-8 URL-encoded)。或者你可以以其他的方式来定义它,但必须是唯一的。
在使用分片扩展之前你首先应该通过ClusterSharding.start方法对需要支持的实体类型进行注册,通常是在集群中每个节点的系统启动的时候。ClusterSharding.start会给你一个可以用来传递的引用。
val counterRegion:ActoRef = ClusterSharding(system).start(
typeName = "Counter",
entityProps = Props[Counter],
settings = ClusterShardingSettings(system),
extractEntityId = extractEntityId,
extractShardId = extractShardId
)
extractEntityId和extractShardId是两个由应用程序指定的函数,用于从传入的消息中解析实体标示符和分片标识符。
val extractEntityId:ShardRegin.ExtractEntityId = {
case EntityEnvelpoe(id, payload) => (id.tostring, payload)
case msg @ Get(id) => (id.toString, msg)
}
val numberOfShards = 100
val extractShardId: ShardRegion.ExtractShardId = {
case EntityEnvelope(id, _) => (id % numberOfShards).toString
case Get(id) => (id % numberOfShards).toString
}
这个例子展示了在消息中定义标示符的两种方式:
Get消息本身已经包含了标示符- 自定义一个
EntityEnvelope专门用于持有标示符,实际要发送的消息则被包装在其中
注意上面的extractEntityId函数是如何处理这两种类型的消息的。extractEntityId会返回一个元组,元组的第二部分则是发送给实体 Actor 的消息,这样一来也可以根据需要将信封解包。
一个分片是一个被一起管理的实体组,而分组则是通过上面展示的extractShardId函数定义的。对于一个特定的实体标识符,其分片标识符总是会相同的。
创建一个好的分片算法其本身就是一个很有意思的挑战。尝试生成一个均匀的分布,比如,使每个分片的实体总数相同。作为一个经验法则,分片的数量应该是已规划的节点最大数量的十倍以上(???)。分片数量小于节点数量将会导致一些节点不会持有任何分片,而太多的分片将会导致分片的管理变的低效,比如,再平衡(rebalance)的开销,因为协调器(coordinator)会涉及到每个分片第一条消息的路由,从而增加延迟。分片算法应该在一个运行中的每个节点上保持一致。可以在集群中的所有节点关闭后进行修改。
有个简单的分片算法可以适用于大多数场景,取实体标识符的哈希绝对值并用分片数量取模(math.abs(id.hashCode()) % shardNum)。为了方便,已经由ShardRegion.HashCodeMessageExtractor提供。
给实体的消息总是通过本地的ShardRegion发送。ShardRegion Actor 作为一个被命名的实体类型的引用,由ClusterSharding.start返回,或者通过ClusterSharding.shardRegion进行检索。如果ShardRegion不知道一个实体的分片位置,则会进行查找。它会将消息指派到正确的节点并按需创建实体 Actor,比如,当指定实体的第一个消息到达时进行创建。
val conterRegion:ActorRef = ClusterSharding(system).shardRegion("Counter")
counterRegion ! Get(123)
expectMsg(0)
counterRegion ! EntityEnvelope(123, Increment)
counterRegion ! Get(123)
expectMsg(1)
一个更全面的例子可以在 Lightbend Activator 教程中找到,名为 Akka Cluster Sharding with Scala!。