Я столкнулся с проблемой, когда сообщения удаленным субъектам в akka-кластере имеют очень высокую задержку (10+ с) после того, как приложение некоторое время запускается. Журналы предполагают, что провайдер пытается разрешить старых действующих лиц ActorPath
s, которые не имеют ничего общего с ActorRef
, на который я отправляю сообщения. Вы можете найти выдержку из журналов в нижней части вопроса.
Мое приложение состоит из 3-х ActorSystems, которые взаимодействуют через akka-cluster
, удаленная передача настроена через artery
, я предоставлю свой applications.conf
(s) дальше вниз.
Я создаю удаленного актера в качестве дочернего для локального актера программно (, как и ), а затем снова делаю то же самое с дочерним актором (поэтомус parent-child-внук все в разных ActorSystem
s).
Если я отправляю сообщение от внука его родителю в самом начале приложения, я получаю сообщение с разумной задержкой (<10 мс). Я запускаю все три системы на одной и той же физической машине. </p>
Если я, тем не менее, запускаю (и убиваю) других участников в трех системах, задержка сообщений для таких сообщений становится непрактичной. Я думаю, что это как-то связано с одним из ActorSystem
, пытающимся разрешить ActorPath
ранее созданных (и уже мертвых) актеров. Я опубликую выдержку из журналов приложений, в которых показано, что что-то подобное происходит.
Я не понимаю, почему нужно разрешать какие-либо другие ссылки, когда я специально отправляю сообщение (удаленному) ActorRef
и был бы искренне благодарен за любую помощь или предложение.
// application.conf(s)
akka {
actor {
provider = "cluster"
serializers {
kryo = "com.twitter.chill.akka.AkkaSerializer"
}
serialization-bindings {
"java.io.Serializable" = kryo
"akka.actor.Props" = kryo
}
enable-additional-serialization-bindings = on
}
remote {
log-sent-messages = on
log-received-messages = on
log-remote-lifecycle-events = off
artery {
enabled = on
transport = tcp
canonical.hostname = "127.0.0.1" // these obv differ depending on the system
canonical.port = 7900
}
}
cluster {
seed-nodes = [ // set in AbstractSystem
"akka://experiments-master@127.0.0.1:7900"
],
# WARN Don't use auto-down feature of Akka Cluster in production.
# auto-down-unreachable-after = 10s
}
loglevel = "DEBUG"
}
// ParentActor.java
public class ParentActor extends ClusterAwareActor {
public static final String DEFAULT_NAME = "ParentActor";
public static Props props() {
return Props.create(TestActor.class, () -> new TestActor());
}
TestActor() {}
private final Instant initTime = Instant.now();
@Override
public Receive createReceive() {
return receiveBuilder()
.match(ClusterEvent.CurrentClusterState.class, this::handle)
.match(ClusterEvent.MemberUp.class, this::handle)
.build().orElse(super.createReceive());
}
@Override
protected void handle(ClusterEvent.CurrentClusterState msg) {
super.handle(msg);
Collection<ActorRef> children = this.remoteActorsOf(ChildActor.props(), ChildActor.DEFAULT_NAME, SystemRole.SLAVE, true);
}
}
// excerpt from ClusterAwareActor
// [ ... ]
protected Collection<ActorRef> remoteActorsOf(Props props, String actorName, SystemRole role, boolean skipOwnSystem) {
Collection<ActorRef> results = Lists.newArrayList();
Collection<Member> members = this.systemRoleMemberMap.get(role);
for (Member member : members) {
if (cluster.selfMember().equals(member) && skipOwnSystem) continue;
Props remoteProps = props.withDeploy(new Deploy(new RemoteScope(member.address())));
if (actorName != null) {
results.add(this.getContext().actorOf(remoteProps, CustomStringUtils.randomActorName(actorName))); // this is just so I don't have name clashes
} else {
results.add(this.getContext().actorOf(remoteProps));
}
}
return results;
}
// ChildActor.java
public class ChildActor extends ClusterAwareActor {
public static final String DEFAULT_NAME = "ChildActor";
public static Props props() {
return Props.create(TestActorTwo.class, () -> new TestActorTwo());
}
TestActorTwo() {}
private final Instant initTime = Instant.now();
@Override
public Receive createReceive() {
return receiveBuilder()
.match(ClusterEvent.CurrentClusterState.class, this::handle)
.match(Messages.PongWithSendTime.class, this::handle)
.build().orElse(super.createReceive());
}
@Override
protected void handle(ClusterEvent.CurrentClusterState msg) {
super.handle(msg);
Collection<ActorRef> children = this.remoteActorsOf(GrandChildActor.props(), SystemRole.SLAVE, true);
Preconditions.checkState(children.size() == 1);
Lists.newArrayList(children).get(0).tell(new Messages.PingMessage(), self());
log().info("[{}] Sent PING at {} ms", DEFAULT_NAME, Duration.between(this.initTime, Instant.now()).toMillis());
}
private void handle(Messages.PongWithTime msg) {
log().info("[{}] Received PONG at {} ms, transmission took ~ {} ms",
DEFAULT_NAME,
Duration.between(this.initTime, Instant.now()).toMillis(),
Duration.between(msg.getSendTime(), Instant.now()).toMillis());
}
}
// GrandChildActor.java
public class GrandChildActor extends AbstractLoggingActor {
public static final String DEFAULT_NAME = "GrandChildActor";
public static Props props() {
return Props.create(PongSayer.class, () -> new PongSayer());
}
PongSayer() {}
@Override
public Receive createReceive() {
return receiveBuilder()
.match(Messages.PingMessage.class, this::handle)
.build().orElse(super.createReceive());
}
private void handle(Messages.PingMessage __) {
getContext().getParent().tell(new Messages.PongWithTime(Instant.now()), self());
}
}
Все это прекрасно работает (т. е. сообщения проходят достаточно быстро), пока есть убежищене было других актеров, созданных и убитых ранее. В этом случае передача одного сообщения (только с Instant
) может занять от 10 до 30 с.
Журналы с ActorSystem
, содержащие ChildActor
, можно посмотреть здесь. :
[INFO] [10/05/2019 22:37:12.431] [dispatcher] [akka://app@172.18.5.173:7700/remote/akka/app@172.18.5.173:7900/user/ParenteActor/ChildActor] Created GrandChildActor and sent PingMessage.
[...]
[DEBUG] [10/05/2019 22:37:13.668] [dispatcher] [akka.actor.LocalActorRefProvider(akka://app)] Resolve (deserialization) of path [remote/akka/app@172.18.5.173:7900/user/path/to/other/unrelated/actor] doesn't match an active actor. It has probably been stopped, using deadLetters.
[DEBUG] [10/05/2019 22:37:13.738] [dispatcher] [akka.actor.LocalActorRefProvider(akka://app)] Resolve (deserialization) of path [remote/akka/app@172.18.5.173:7900/user/path/to/other/unrelated/actor] doesn't match an active actor. It has probably been stopped, using deadLetters.
[DEBUG] [10/05/2019 22:37:13.809] [dispatcher] [akka.actor.LocalActorRefProvider(akka://app)] Resolve (deserialization) of path [remote/akka/app@172.18.5.173:7900/user/path/to/other/unrelated/actor] doesn't match an active actor. It has probably been stopped, using deadLetters.
[...] // there is a metric ton of these
[DEBUG] [10/05/2019 22:37:32.498] [dispatcher] [akka.actor.LocalActorRefProvider(akka://app)] Resolve (deserialization) of path [remote/akka/app@172.18.5.173:7900/user/path/to/other/unrelated/actor] doesn't match an active actor. It has probably been stopped, using deadLetters.
[DEBUG] [10/05/2019 22:37:32.568] [dispatcher] [akka.actor.LocalActorRefProvider(akka://app)] Resolve (deserialization) of path [remote/akka/app@172.18.5.173:7900/user/path/to/other/unrelated/actor] doesn't match an active actor. It has probably been stopped, using deadLetters.
[DEBUG] [10/05/2019 22:37:32.638] [dispatcher] [akka.actor.LocalActorRefProvider(akka://app)] Resolve (deserialization) of path [remote/akka/app@172.18.5.173:7900/user/path/to/other/unrelated/actor] doesn't match an active actor. It has probably been stopped, using deadLetters.
[DEBUG] [10/05/2019 22:37:32.708] [dispatcher] [akka.actor.LocalActorRefProvider(akka://app)] Resolve (deserialization) of path [remote/akka/app@172.18.5.173:7900/user/path/to/other/unrelated/actor] doesn't match an active actor. It has probably been stopped, using deadLetters.
// before finally
[INFO] [10/05/2019 22:37:32.711] [dispatcher] [akka://app@172.18.5.173:7700/remote/akka/app@172.18.5.173:7900/user/ParentActor/ChildActor] Received Lookup response. Transmission took 20234 ms.