Я работаю над сервисом, который использует Akka Persistence для поиска событий. До сих пор мы успешно хранили события в журнале Cassandra. Теперь мы хотим внедрить CQRS, используя Akka Persistence Query. В качестве первого подхода мы пытаемся следовать шаблону синглтона кластера, чтобы актер передавал хранимые события по тегам. На данный момент у нас есть этот довольно простой актер, который будет упакован как синглтон:
public class EventProcessor extends AbstractLoggingActor {
private static final Logger LOG = LoggerFactory.getLogger(EventProcessor.class);
private final CassandraReadJournal journal;
public EventProcessor(ActorSystem system) {
journal =
PersistenceQuery.get(system)
.getReadJournalFor(CassandraReadJournal.class, CassandraReadJournal.Identifier());
journal
.eventsByTag(OnBoardingBehavior.ENTITY_TYPE_KEY.name(), Offset.noOffset())
.map(EventEnvelope::persistenceId)
.to(Sink.foreach(this::logMessage))
.run(system);
}
private void logMessage(String id) {
LOG.info(String.format("########## Received persistenceId %s", id));
}
@Override
public Receive createReceive() {
return null;
}
}
И вот как мы заключаем актера в опекуна:
akka.actor.ActorSystem classicSystem = context.getSystem().classicSystem();
ClusterSingletonManagerSettings settings =
ClusterSingletonManagerSettings.create(classicSystem);
Props clusterSingletonManagerProps =
ClusterSingletonManager.props(
Props.create(EventProcessor.class, classicSystem),
PoisonPill.getInstance(),
settings);
classicSystem.actorOf(clusterSingletonManagerProps, "clusterSingletonManager");
Когда мы запускаем сервис мы получаем следующее исключение (в строке actorOf):
java.lang.UnsupportedOperationException: cannot create top-level actor [clusterSingletonManager] from the outside on ActorSystem with custom user guardian
at akka.actor.ActorSystemImpl.actorOf(ActorSystem.scala:900)
at r.service.onboarding.actor.Guardian.initializeEventProcessor(Guardian.java:95)
at r.service.onboarding.actor.Guardian.<init>(Guardian.java:56)
at r.service.onboarding.actor.Guardian.lambda$create$745d95f3$1(Guardian.java:66)
at akka.actor.typed.javadsl.Behaviors$.$anonfun$setup$1(Behaviors.scala:47)
at a.a.t.i.BehaviorImpl$DeferredBehavior$$anon$1.apply(BehaviorImpl.scala:118)
at akka.actor.typed.Behavior$.start(Behavior.scala:168)
at a.a.t.i.InterceptorImpl$$anon$1.start(InterceptorImpl.scala:48)
at akka.actor.typed.BehaviorInterceptor.aroundStart(BehaviorInterceptor.scala:55)
at a.a.typed.internal.InterceptorImpl.preStart(InterceptorImpl.scala:71)
at a.a.typed.internal.InterceptorImpl$.$anonfun$apply$1(InterceptorImpl.scala:28)
at a.a.t.i.BehaviorImpl$DeferredBehavior$$anon$1.apply(BehaviorImpl.scala:118)
at akka.actor.typed.Behavior$.start(Behavior.scala:168)
at akka.actor.typed.Behavior$.interpret(Behavior.scala:275)
at akka.actor.typed.Behavior$.interpretMessage(Behavior.scala:230)
at a.a.t.internal.adapter.ActorAdapter.handleMessage(ActorAdapter.scala:126)
at a.a.t.internal.adapter.ActorAdapter.aroundReceive(ActorAdapter.scala:106)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:573)
at akka.actor.ActorCell.invoke(ActorCell.scala:543)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:269)
at akka.dispatch.Mailbox.run(Mailbox.scala:230)
at akka.dispatch.Mailbox.exec(Mailbox.scala:242)
... 5 frames excluded
Я хотел бы упомянуть, что мы можем порождать другого актера из того же опекуна, как:
ActorRef<Command> actorRef =
context.spawn(OnBoardingBehavior.create(uuid), "OnBoardingBehavior-" + uuid);
Я совершенно новичок в Akka, поэтому любая помощь будет высоко ценится!