Я хочу сделать простой пример с распределенными данными, проверенными по этой ссылке в документации: https://doc.akka.io/docs/akka/current/typed/distributed-data.html
Я использую кластер с двумя системами. Проблема в том, что я не могу видеть данные об актерах разных систем.
Я совершенно новичок в АККА и надеюсь, что не совершу слишком много ошибок. Однако это мой код:
1) SimpleActor
public interface Command {}
public static class Increment implements Command {}
public static class GetValue implements Command {}
public enum InternalResponse implements SimpleActor.Command { STUB }
private static class InternalGetResponse implements SimpleActor.Command {
final Replicator.GetResponse<GCounter> rsp;
final ActorRef<SimpleActor.Command> replyTo;
InternalGetResponse(Replicator.GetResponse<GCounter> rsp, ActorRef<SimpleActor.Command> replyTo) {
this.rsp = rsp;
this.replyTo = replyTo;
}
}
private static final class InternalSubscribeResponse implements SimpleActor.Command {
final Replicator.SubscribeResponse<GCounter> rsp;
InternalSubscribeResponse(Replicator.SubscribeResponse<GCounter> rsp) {
this.rsp = rsp;
}
}
private final ActorContext<Command> context;
private final ReplicatorMessageAdapter<Command, GCounter> replicatorAdapter;
private final SelfUniqueAddress node;
private final Key<GCounter> key;
private final Logger log;
public static Behavior<Command> create(Key<GCounter> key) {
return Behaviors.setup(
ctx ->
DistributedData.withReplicatorMessageAdapter(
(ReplicatorMessageAdapter<Command, GCounter> replicatorAdapter) ->
new SimpleActor(ctx, replicatorAdapter, key)));
}
private SimpleActor(ActorContext<Command> context, ReplicatorMessageAdapter<Command, GCounter> replicator, Key<GCounter> key) {
super(context);
this.context = context;
this.replicatorAdapter = replicator;
this.key = key;
this.log = context.getLog();
this.node = DistributedData.get(context.getSystem()).selfUniqueAddress();
this.replicatorAdapter.subscribe(this.key, InternalSubscribeResponse::new);
}
@Override
public Receive<Command> createReceive() {
return newReceiveBuilder()
.onMessage(Increment.class, message -> onIncrement())
.onMessage(GetValue.class, message -> onGetValue())
.onMessage(InternalGetResponse.class, rsp -> handlerGetResponse(rsp))
.onMessage(InternalSubscribeResponse.class, rsp -> handlerInternalSubscribeResponse(rsp))
.build();
}
private Behavior<Command> handlerInternalSubscribeResponse(InternalSubscribeResponse msg) {
if (msg.rsp instanceof Replicator.Changed) {
GCounter counter = ((Replicator.Changed<?>) msg.rsp).get(key);
int value = counter.getValue().intValue();
log.info("handlerInternalSubscribeResponse {}, value: "+value);
return this;
} else {
// no deletes
return Behaviors.unhandled();
}
}
private Behavior<Command> handlerGetResponse(InternalGetResponse msg) {
if (msg.rsp instanceof Replicator.GetSuccess) {
int value = ((Replicator.GetSuccess<?>) msg.rsp).get(key).getValue().intValue();
//msg.replyTo.tell(value);
log.info("handlerGetResponse {}: value: "+value);
return this;
} else {
// not dealing with failures
log.info("handlerGetResponse {}: InternalGetResponse.rsp it is NOT istance of Replicator.GetSuccess => value: 0");
return Behaviors.unhandled();
}
}
private Behavior<Command> onGetValue() {
log.info("onGetValue {}");
replicatorAdapter.askGet(askReplyTo -> {
return new Replicator.Get<>(key, Replicator.readLocal(), askReplyTo);
}, msg -> {
log.info("onGetValue {} MSG: "+msg.key());
return new InternalGetResponse(msg, context.getSelf());
});
return this;
}
private Behavior<Command> onIncrement() {
log.info("OnIncrement {}");
replicatorAdapter.askUpdate(
askReplyTo ->
new Replicator.Update<>(
key,
GCounter.empty(),
Replicator.writeLocal(),
askReplyTo,
curr -> curr.increment(node, 1)),
msg -> {
log.info("onIncrement {} MSG: "+msg.key());
return InternalResponse.STUB;
});
return this;
}
2) Main:
private static Config clusterConfig =
ConfigFactory.parseString(
"akka { \n"
+ " actor.provider = cluster \n"
+ " remote.artery { \n"
+ " canonical { \n"
+ " hostname = \"127.0.0.1\" \n"
+ " port = 2551 \n"
+ " } \n"
+ " } \n"
+ "} \n");
private static Config noPort =
ConfigFactory.parseString(
" akka.remote.classic.netty.tcp.port = 0 \n"
+ " akka.remote.artery.canonical.port = 0 \n");
public static void main(String[] args) throws InterruptedException {
final Key<GCounter> key = new Key<GCounter>("KeyData") {
private static final long serialVersionUID = 4984217808899642045L;
};
ActorSystem<Object> system =
ActorSystem.create(Behaviors.empty(), "ClusterSystem", noPort.withFallback(clusterConfig));
ActorRef<com.exerciseCluster.distributedData.SimpleActor.Command> counter1 = system.systemActorOf(SimpleActor.create(key), "PING", Props.empty());
ActorSystem<Object> system2 =
ActorSystem.create(Behaviors.empty(), "ClusterSystem", noPort.withFallback(clusterConfig));
ActorRef<com.exerciseCluster.distributedData.SimpleActor.Command> counter2 = system2.systemActorOf(SimpleActor.create(key), "PING", Props.empty());
counter1.tell(new SimpleActor.GetValue());
counter1.tell(new SimpleActor.Increment());
counter1.tell(new SimpleActor.GetValue());
counter1.tell(new SimpleActor.Increment());
counter1.tell(new SimpleActor.GetValue());
counter1.tell(new SimpleActor.Increment());
counter1.tell(new SimpleActor.GetValue());
counter1.tell(new SimpleActor.Increment());
counter1.tell(new SimpleActor.GetValue());
// counter2
Thread.sleep(1000);
counter2.tell(new SimpleActor.GetValue());
}
С актором, который увеличивает значение, я вижу изменения. С последним актером (counter2) я не вижу обновленного значения. Как я могу решить это?