Я бы рассмотрел использование akka-cluster
и akka-cluster-sharding
. Во-первых, это дает вам пропускную способность , а также надежность. Однако это также заставит систему управлять созданием субъектов «сущности».
Но вы должны изменить способ общения с этими актерами. Вы создаете ShardRegion
актер, который обрабатывает все сообщения:
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.cluster.sharding.ClusterSharding;
import akka.cluster.sharding.ClusterShardingSettings;
import akka.cluster.sharding.ShardRegion;
import akka.event.Logging;
import akka.event.LoggingAdapter;
public class MyEventReceiver extends AbstractActor {
private final ActorRef shardRegion;
public static Props props() {
return Props.create(MyEventReceiver.class, MyEventReceiver::new);
}
static ShardRegion.MessageExtractor messageExtractor
= new ShardRegion.HashCodeMessageExtractor(100) {
// using the supplied hash code extractor to shard
// the actors based on the hashcode of the entityid
@Override
public String entityId(Object message) {
if (message instanceof EventInput) {
return ((EventInput) message).uuid().toString();
}
return null;
}
@Override
public Object entityMessage(Object message) {
if (message instanceof EventInput) {
return message;
}
return message; // I don't know why they do this it's in the sample
}
};
public MyEventReceiver() {
ActorSystem system = getContext().getSystem();
ClusterShardingSettings settings =
ClusterShardingSettings.create(system);
// this is setup for the money shot
shardRegion = ClusterSharding.get(system)
.start("EventShardingSytem",
Props.create(EventActor.class),
settings,
messageExtractor);
}
@Override
public Receive createReceive() {
return receiveBuilder().match(
EventInput.class,
e -> {
log.info("Got an event with UUID {} forwarding ... ",
e.uuid());
// the money shot
deviceRegion.tell(e, getSender());
}
).build();
}
}
Итак, этот Actor MyEventReceiver
работает на всех узлах вашего кластера и инкапсулирует shardRegion
Actor. Вы больше не отправляете сообщения своим EventActor
напрямую, но, используя акторы MyEventReceiver
и deviceRegion
, вы используете систему шардинга, которая отслеживает, в каком узле кластера живет определенный EventActor
, Он создаст его, если он еще не был создан, или перенаправит сообщения, если он был создан. Каждый EventActor
должен иметь уникальный идентификатор: который извлекается из сообщения (так что UUID
довольно хорош для этого, но это может быть какой-то другой идентификатор, такой как customerID или orderID, или что-то еще, если он уникален для экземпляра Actor, с которым вы хотите его обработать).
(я опускаю код EventActor
, в остальном это довольно обычный актер, в зависимости от того, что вы делаете с ним, «магия» в приведенном выше коде).
Система шардинга автоматически знает, как создать EventActor
и присвоить его шарду на основе выбранного вами алгоритма (в данном конкретном случае она основана на hashCode
уникального идентификатора, который все Я когда-либо использовал). Кроме того, вы гарантировано только один Актер для любого данного уникального идентификатора. Сообщение прозрачно направляется на правильный узел и осколок, где бы он ни находился; от того узла и осколка, куда он отправляется.
Больше информации и примеров кода на сайте Akka и в документации.
Это довольно приятный способ убедиться, что один и тот же объект / субъект всегда обрабатывает сообщения, предназначенные для него. Кластер и сегментирование автоматически заботятся о правильном распределении акторов, об отказе и т. Д. (Вам нужно было бы добавить akka-persistence
, чтобы получить пассивацию, регидратацию и отработку отказа, если с действующим субъектом связано несколько строгих состояний (что должно быть восстановленным)).