не может создать актера верхнего уровня [clusterSingletonManager] извне в ActorSystem с пользовательским опекуном пользователя - PullRequest
0 голосов
/ 01 апреля 2020

Я работаю над сервисом, который использует Akka Persistence для поиска событий. До сих пор мы успешно хранили события в журнале Cassandra. Теперь мы хотим внедрить CQRS, используя Akka Persistence Query. В качестве первого подхода мы пытаемся следовать шаблону синглтона кластера, чтобы актер передавал хранимые события по тегам. На данный момент у нас есть этот довольно простой актер, который будет упакован как синглтон:

public class EventProcessor extends AbstractLoggingActor {
  private static final Logger LOG = LoggerFactory.getLogger(EventProcessor.class);

  private final CassandraReadJournal journal;

  public EventProcessor(ActorSystem system) {
    journal =
        PersistenceQuery.get(system)
            .getReadJournalFor(CassandraReadJournal.class, CassandraReadJournal.Identifier());

    journal
        .eventsByTag(OnBoardingBehavior.ENTITY_TYPE_KEY.name(), Offset.noOffset())
        .map(EventEnvelope::persistenceId)
        .to(Sink.foreach(this::logMessage))
        .run(system);
  }

  private void logMessage(String id) {
    LOG.info(String.format("########## Received persistenceId %s", id));
  }

  @Override
  public Receive createReceive() {
    return null;
  }
}

И вот как мы заключаем актера в опекуна:

    akka.actor.ActorSystem classicSystem = context.getSystem().classicSystem();

    ClusterSingletonManagerSettings settings =
        ClusterSingletonManagerSettings.create(classicSystem);

    Props clusterSingletonManagerProps =
        ClusterSingletonManager.props(
            Props.create(EventProcessor.class, classicSystem),
            PoisonPill.getInstance(),
            settings);

    classicSystem.actorOf(clusterSingletonManagerProps, "clusterSingletonManager");

Когда мы запускаем сервис мы получаем следующее исключение (в строке actorOf):

java.lang.UnsupportedOperationException: cannot create top-level actor [clusterSingletonManager] from the outside on ActorSystem with custom user guardian
at akka.actor.ActorSystemImpl.actorOf(ActorSystem.scala:900)
at r.service.onboarding.actor.Guardian.initializeEventProcessor(Guardian.java:95)
at r.service.onboarding.actor.Guardian.<init>(Guardian.java:56)
at r.service.onboarding.actor.Guardian.lambda$create$745d95f3$1(Guardian.java:66)
at akka.actor.typed.javadsl.Behaviors$.$anonfun$setup$1(Behaviors.scala:47)
at a.a.t.i.BehaviorImpl$DeferredBehavior$$anon$1.apply(BehaviorImpl.scala:118)
at akka.actor.typed.Behavior$.start(Behavior.scala:168)
at a.a.t.i.InterceptorImpl$$anon$1.start(InterceptorImpl.scala:48)
at akka.actor.typed.BehaviorInterceptor.aroundStart(BehaviorInterceptor.scala:55)
at a.a.typed.internal.InterceptorImpl.preStart(InterceptorImpl.scala:71)
at a.a.typed.internal.InterceptorImpl$.$anonfun$apply$1(InterceptorImpl.scala:28)
at a.a.t.i.BehaviorImpl$DeferredBehavior$$anon$1.apply(BehaviorImpl.scala:118)
at akka.actor.typed.Behavior$.start(Behavior.scala:168)
at akka.actor.typed.Behavior$.interpret(Behavior.scala:275)
at akka.actor.typed.Behavior$.interpretMessage(Behavior.scala:230)
at a.a.t.internal.adapter.ActorAdapter.handleMessage(ActorAdapter.scala:126)
at a.a.t.internal.adapter.ActorAdapter.aroundReceive(ActorAdapter.scala:106)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:573)
at akka.actor.ActorCell.invoke(ActorCell.scala:543)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:269)
at akka.dispatch.Mailbox.run(Mailbox.scala:230)
at akka.dispatch.Mailbox.exec(Mailbox.scala:242)
... 5 frames excluded

Я хотел бы упомянуть, что мы можем порождать другого актера из того же опекуна, как:

ActorRef<Command> actorRef =
        context.spawn(OnBoardingBehavior.create(uuid), "OnBoardingBehavior-" + uuid);

Я совершенно новичок в Akka, поэтому любая помощь будет высоко ценится!

1 Ответ

1 голос
/ 01 апреля 2020

Наконец-то разобрался. Очевидно, что при работе в типизированной системе я не могу использовать старый нетипизированный подход с ClusterSingletonManager.props. Я нашел здесь правильный способ для типизированной системы: https://doc.akka.io/docs/akka/current/typed/cluster-singleton.html

Итак, мой актер теперь AbstractBehavior:

public class EventProcessor extends AbstractBehavior<Void>

И вот как я оборачиваюсь это как синглтон:

    ClusterSingleton singleton = ClusterSingleton.get(context.getSystem());

    singleton.init(
        SingletonActor.of(EventProcessor.create(identityRequestAdapter), "eventProcessor"));
...