Я пытаюсь реализовать Java-версию примера "Up and Running" из книги Мэннинга "Akka in Action".Это простой Http-сервер, основанный на Actor Model, для сохранения (просто в памяти) и извлечения некоторых событий.У меня нет проблем с сохранением событий.Но у меня есть проблема при запросе моей системы акторов на события (все события).
Это актуальный (я поставил тройные точки вместо кода, который, я думаю, не имеет ничего общего с моей проблемой)для BoxOffice
- родительский актер для всех TicketSeller
с (позднее они отвечают за управление состоянием для каждого события).
public class BoxOffice extends AbstractActor {
...
private Timeout timeout;
final static String NAME = "boxOffice";
//create child actors
private ActorRef createTicketSeller(String name) {
return getContext().actorOf(TicketSeller.props(name));
}
public BoxOffice(Timeout timeout) {
this.timeout = timeout;
}
//the only method of an actor
@Override
public Receive createReceive() {
return receiveBuilder()
...
...
.match(GetEvent.class, this::receiveMsgGetEvent)
.match(GetEvents.class, this::receiveMsgGetEvents)
...
.build();
}
...
private void receiveMsgGetEvent(GetEvent getEvent) {
Optional<ActorRef> maybeChild = getChildByName(getEvent.getName());
log.info(String.format("Asking for event %s. Child is present: %s", getEvent.getName(), maybeChild.isPresent()));
OptionalConsumer.of(maybeChild)
.ifPresent(child -> child.forward(new TicketSeller.GetEvent(), getContext()))
.ifNotPresent(() -> getSender().tell(Optional.empty(), getSelf()));
}
private void receiveMsgGetEvents(GetEvents getEvents) {
//ask self() for each of the passed-in event
List<CompletableFuture<Optional<Event>>> listFutureMaybeEvent =
allChildrenStream()
.map(child ->
ask(getSelf(), new GetEvent(child.path().name()), timeout)
.thenApply(obj -> (Optional<Event>) obj)
.toCompletableFuture())
.collect(toList());
CompletableFuture<Events> eventsFuture = toFutureEvents(listFutureMaybeEvent);
pipe(eventsFuture, getContext().dispatcher()).to(sender());
}
private Stream<ActorRef> allChildrenStream() {
return StreamSupport.stream(getContext().getChildren().spliterator(), false);
}
...
private CompletableFuture<Events> toFutureEvents(List<CompletableFuture<Optional<Event>>> futurePossibleEvents) {
List<Event> events = futurePossibleEvents.stream()
.map(CompletableFuture::join)
.filter(Optional::isPresent)
.map(Optional::get)
.collect(toList());
return CompletableFuture.supplyAsync(() -> new Events(events));
}
...
private Optional<ActorRef> getChildByName(String name) {
return getContext().findChild(name);
}
static Props props(Timeout timeout) {
return Props.create(BoxOffice.class, () -> new BoxOffice(timeout));
}
В основном происходит то, что в receiveMsgGetEvents
я отправляю сообщение self
с сообщением, содержащим дочернее имя child.path.name
.Однако, когда я получаю это сообщение (соответственно receiveMsgGetEvent
), дочерний актер не может быть найден по этому имени:
INFO [BoxOffice]: Asking for event $a. Child is present: false
Также следует отметить, что между отправкой GetEvent
иполучает тот же актер (как секунды, но я чувствую, что это меньше, чем 20).
Возможно, проблема связана с моими манипуляциями CompletableFutures
, но я попытался воспроизвести эквивалентный код scala.
Журнал информации сверху вместе с этим сообщением:
INFO [DeadLetterActorRef]: Message [java.util.Optional] from Actor[akka://mycompanyAkkaDemo/user/boxOffice#1554115585] to Actor[akka://mycompanyAkkaDemo/deadLetters] was not delivered. [1] dead letters encountered. This logging...
печатаются после трассировки стека, которая печатается через настроенное время ожидания (20 секунд):
ERROR [ActorSystemImpl]: Error during processing of request: 'Ask timed out on [Actor[akka://mycompanyAkkaDemo/user/boxOffice#1554115585]] after [20000 ms]. Sender[null] sent message of type "com.mycompany.demo.messages.boxoffice.GetEvents".'. Completing with 500 Internal Server Error response. To change default exception handling behavior, provide a custom ExceptionHandler.
akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://mycompanyAkkaDemo/user/boxOffice#1554115585]] after [20000 ms]. Sender[null] sent message of type "com.mycompany.demo.messages.boxoffice.GetEvents".
at akka.pattern.PromiseActorRef$.$anonfun$defaultOnTimeout$1(AskSupport.scala:595)
at akka.pattern.PromiseActorRef$.$anonfun$apply$1(AskSupport.scala:605)
at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:140)
...
at java.lang.Thread.run(Thread.java:748)
ERROR [OneForOneStrategy]: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://mycompanyAkkaDemo/user/boxOffice#1554115585]] after [20000 ms]. Sender[null] sent message of type "com.mycompany.demo.messages.boxoffice.GetEvent".
java.util.concurrent.CompletionException: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://mycompanyAkkaDemo/user/boxOffice#1554115585]] after [20000 ms]. Sender[null] sent message of type "com.mycompany.demo.messages.boxoffice.GetEvent".
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
...
Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://mycompanyAkkaDemo/user/boxOffice#1554115585]] after [20000 ms]. Sender[null] sent message of type "com.mycompany.demo.messages.boxoffice.GetEvent".
at akka.pattern.PromiseActorRef$.$anonfun$defaultOnTimeout$1(AskSupport.scala:595)
... 11 common frames omitted