Почему субъекты в одном кластере не могут видеть обновленное значение с помощью DistributedData - PullRequest
0 голосов
/ 17 марта 2020

Я хочу сделать простой пример с распределенными данными, проверенными по этой ссылке в документации: https://doc.akka.io/docs/akka/current/typed/distributed-data.html

Я использую кластер с двумя системами. Проблема в том, что я не могу видеть данные об актерах разных систем.

Я совершенно новичок в АККА и надеюсь, что не совершу слишком много ошибок. Однако это мой код:

1) SimpleActor

public interface Command {}
public static class Increment implements Command {}
public static class GetValue implements Command {}
public enum InternalResponse implements SimpleActor.Command { STUB }

private static class InternalGetResponse implements SimpleActor.Command {
    final Replicator.GetResponse<GCounter> rsp;
    final ActorRef<SimpleActor.Command> replyTo;

    InternalGetResponse(Replicator.GetResponse<GCounter> rsp, ActorRef<SimpleActor.Command> replyTo) {
        this.rsp = rsp;
        this.replyTo = replyTo;
    }
}
private static final class InternalSubscribeResponse implements SimpleActor.Command {
    final Replicator.SubscribeResponse<GCounter> rsp;

    InternalSubscribeResponse(Replicator.SubscribeResponse<GCounter> rsp) {
        this.rsp = rsp;
    }
}

private final ActorContext<Command> context;
private final ReplicatorMessageAdapter<Command, GCounter> replicatorAdapter;
private final SelfUniqueAddress node;
private final Key<GCounter> key;
private final Logger log;

public static Behavior<Command> create(Key<GCounter> key) {
    return Behaviors.setup(
              ctx ->
                  DistributedData.withReplicatorMessageAdapter(
                      (ReplicatorMessageAdapter<Command, GCounter> replicatorAdapter) ->
                          new SimpleActor(ctx, replicatorAdapter, key)));
}

private SimpleActor(ActorContext<Command> context, ReplicatorMessageAdapter<Command, GCounter> replicator, Key<GCounter> key) {
    super(context);
    this.context = context;
    this.replicatorAdapter = replicator;
    this.key = key;
    this.log = context.getLog();
    this.node = DistributedData.get(context.getSystem()).selfUniqueAddress();

    this.replicatorAdapter.subscribe(this.key, InternalSubscribeResponse::new);
}



@Override
public Receive<Command> createReceive() {
    return newReceiveBuilder()
            .onMessage(Increment.class, message -> onIncrement())
            .onMessage(GetValue.class, message -> onGetValue())
            .onMessage(InternalGetResponse.class, rsp -> handlerGetResponse(rsp))
            .onMessage(InternalSubscribeResponse.class, rsp -> handlerInternalSubscribeResponse(rsp))
            .build();
}

private Behavior<Command> handlerInternalSubscribeResponse(InternalSubscribeResponse msg) {
    if (msg.rsp instanceof Replicator.Changed) {
        GCounter counter = ((Replicator.Changed<?>) msg.rsp).get(key);
        int value = counter.getValue().intValue();
        log.info("handlerInternalSubscribeResponse {}, value: "+value);
        return this;
      } else {
        // no deletes
        return Behaviors.unhandled();
      }
}

private Behavior<Command> handlerGetResponse(InternalGetResponse msg) {
    if (msg.rsp instanceof Replicator.GetSuccess) {
        int value = ((Replicator.GetSuccess<?>) msg.rsp).get(key).getValue().intValue();
        //msg.replyTo.tell(value);
        log.info("handlerGetResponse {}: value: "+value);
        return this;
    } else {
        // not dealing with failures
        log.info("handlerGetResponse {}: InternalGetResponse.rsp it is NOT istance of Replicator.GetSuccess => value: 0");
        return Behaviors.unhandled();
    }
}

private Behavior<Command> onGetValue() {
    log.info("onGetValue {}");
    replicatorAdapter.askGet(askReplyTo -> {
            return new Replicator.Get<>(key, Replicator.readLocal(), askReplyTo);
        }, msg -> {
            log.info("onGetValue {} MSG: "+msg.key());
            return new InternalGetResponse(msg, context.getSelf());
        });
    return this;
}

private Behavior<Command> onIncrement() {
    log.info("OnIncrement {}");
    replicatorAdapter.askUpdate(
              askReplyTo ->
                  new Replicator.Update<>(
                      key,
                      GCounter.empty(),
                      Replicator.writeLocal(),
                      askReplyTo,
                      curr -> curr.increment(node, 1)),
              msg -> {
                  log.info("onIncrement {} MSG: "+msg.key());
                  return InternalResponse.STUB;
              });
    return this;
}

2) Main:

private static Config clusterConfig =
          ConfigFactory.parseString(
              "akka { \n"
                  + "  actor.provider = cluster \n"
                  + "  remote.artery { \n"
                  + "    canonical { \n"
                  + "      hostname = \"127.0.0.1\" \n"
                  + "      port = 2551 \n"
                  + "    } \n"
                  + "  } \n"
                  + "}  \n");

      private static Config noPort =
          ConfigFactory.parseString(
              "      akka.remote.classic.netty.tcp.port = 0 \n"
                  + "      akka.remote.artery.canonical.port = 0 \n");

      public static void main(String[] args) throws InterruptedException {
          final Key<GCounter> key = new Key<GCounter>("KeyData") {
            private static final long serialVersionUID = 4984217808899642045L;
        };

          ActorSystem<Object> system =
                  ActorSystem.create(Behaviors.empty(), "ClusterSystem", noPort.withFallback(clusterConfig));
          ActorRef<com.exerciseCluster.distributedData.SimpleActor.Command> counter1 = system.systemActorOf(SimpleActor.create(key), "PING", Props.empty());
          ActorSystem<Object> system2 =
                  ActorSystem.create(Behaviors.empty(), "ClusterSystem", noPort.withFallback(clusterConfig));
          ActorRef<com.exerciseCluster.distributedData.SimpleActor.Command> counter2 = system2.systemActorOf(SimpleActor.create(key), "PING", Props.empty());
          counter1.tell(new SimpleActor.GetValue());
          counter1.tell(new SimpleActor.Increment());
          counter1.tell(new SimpleActor.GetValue());
          counter1.tell(new SimpleActor.Increment());
          counter1.tell(new SimpleActor.GetValue());
          counter1.tell(new SimpleActor.Increment());
          counter1.tell(new SimpleActor.GetValue());
          counter1.tell(new SimpleActor.Increment());
          counter1.tell(new SimpleActor.GetValue());

          // counter2
          Thread.sleep(1000);
          counter2.tell(new SimpleActor.GetValue());



      }

С актором, который увеличивает значение, я вижу изменения. С последним актером (counter2) я не вижу обновленного значения. Как я могу решить это?

...