Akka: потерянная ссылка для `child.path.name` на ask - PullRequest
0 голосов
/ 04 марта 2019

Я пытаюсь реализовать Java-версию примера "Up and Running" из книги Мэннинга "Akka in Action".Это простой Http-сервер, основанный на Actor Model, для сохранения (просто в памяти) и извлечения некоторых событий.У меня нет проблем с сохранением событий.Но у меня есть проблема при запросе моей системы акторов на события (все события).

Это актуальный (я поставил тройные точки вместо кода, который, я думаю, не имеет ничего общего с моей проблемой)для BoxOffice - родительский актер для всех TicketSeller с (позднее они отвечают за управление состоянием для каждого события).

public class BoxOffice extends AbstractActor {

    ...
    private Timeout timeout;
    final static String NAME = "boxOffice";

    //create child actors
    private ActorRef createTicketSeller(String name) {
        return getContext().actorOf(TicketSeller.props(name));
    }

    public BoxOffice(Timeout timeout) {
        this.timeout = timeout;
    }

    //the only method of an actor
    @Override
    public Receive createReceive() {
        return receiveBuilder()
                ...
                ...
                .match(GetEvent.class, this::receiveMsgGetEvent)
                .match(GetEvents.class, this::receiveMsgGetEvents)
                ...
                .build();
    }

    ...

    private void receiveMsgGetEvent(GetEvent getEvent) {
        Optional<ActorRef> maybeChild = getChildByName(getEvent.getName());
        log.info(String.format("Asking for event %s. Child is present: %s", getEvent.getName(), maybeChild.isPresent()));
        OptionalConsumer.of(maybeChild)
                .ifPresent(child -> child.forward(new TicketSeller.GetEvent(), getContext()))
                .ifNotPresent(() -> getSender().tell(Optional.empty(), getSelf()));
    }

    private void receiveMsgGetEvents(GetEvents getEvents) {
        //ask self() for each of the passed-in event
        List<CompletableFuture<Optional<Event>>> listFutureMaybeEvent =
                allChildrenStream()
                .map(child ->
                        ask(getSelf(), new GetEvent(child.path().name()), timeout)
                        .thenApply(obj -> (Optional<Event>) obj)
                        .toCompletableFuture())
                .collect(toList());

        CompletableFuture<Events> eventsFuture = toFutureEvents(listFutureMaybeEvent);
        pipe(eventsFuture, getContext().dispatcher()).to(sender());
    }

    private Stream<ActorRef> allChildrenStream() {
        return StreamSupport.stream(getContext().getChildren().spliterator(), false);
    }

    ...

    private CompletableFuture<Events> toFutureEvents(List<CompletableFuture<Optional<Event>>> futurePossibleEvents) {
        List<Event> events = futurePossibleEvents.stream()
                .map(CompletableFuture::join)
                .filter(Optional::isPresent)
                .map(Optional::get)
                .collect(toList());
        return CompletableFuture.supplyAsync(() -> new Events(events));
    }

    ...

    private Optional<ActorRef> getChildByName(String name) {
        return getContext().findChild(name);
    }

    static Props props(Timeout timeout) {
        return Props.create(BoxOffice.class, () -> new BoxOffice(timeout));
    }

В основном происходит то, что в receiveMsgGetEvents я отправляю сообщение self с сообщением, содержащим дочернее имя child.path.name.Однако, когда я получаю это сообщение (соответственно receiveMsgGetEvent), дочерний актер не может быть найден по этому имени:

INFO  [BoxOffice]: Asking for event $a. Child is present: false

Также следует отметить, что между отправкой GetEvent иполучает тот же актер (как секунды, но я чувствую, что это меньше, чем 20).

Возможно, проблема связана с моими манипуляциями CompletableFutures, но я попытался воспроизвести эквивалентный код scala.

Журнал информации сверху вместе с этим сообщением:

INFO  [DeadLetterActorRef]: Message [java.util.Optional] from Actor[akka://mycompanyAkkaDemo/user/boxOffice#1554115585] to Actor[akka://mycompanyAkkaDemo/deadLetters] was not delivered. [1] dead letters encountered. This logging...

печатаются после трассировки стека, которая печатается через настроенное время ожидания (20 секунд):

ERROR [ActorSystemImpl]: Error during processing of request: 'Ask timed out on [Actor[akka://mycompanyAkkaDemo/user/boxOffice#1554115585]] after [20000 ms]. Sender[null] sent message of type "com.mycompany.demo.messages.boxoffice.GetEvents".'. Completing with 500 Internal Server Error response. To change default exception handling behavior, provide a custom ExceptionHandler.
akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://mycompanyAkkaDemo/user/boxOffice#1554115585]] after [20000 ms]. Sender[null] sent message of type "com.mycompany.demo.messages.boxoffice.GetEvents".
    at akka.pattern.PromiseActorRef$.$anonfun$defaultOnTimeout$1(AskSupport.scala:595)
    at akka.pattern.PromiseActorRef$.$anonfun$apply$1(AskSupport.scala:605)
    at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:140)
    ...
    at java.lang.Thread.run(Thread.java:748)
ERROR [OneForOneStrategy]: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://mycompanyAkkaDemo/user/boxOffice#1554115585]] after [20000 ms]. Sender[null] sent message of type "com.mycompany.demo.messages.boxoffice.GetEvent".
java.util.concurrent.CompletionException: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://mycompanyAkkaDemo/user/boxOffice#1554115585]] after [20000 ms]. Sender[null] sent message of type "com.mycompany.demo.messages.boxoffice.GetEvent".
    at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
    at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
    at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
    ...
Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://mycompanyAkkaDemo/user/boxOffice#1554115585]] after [20000 ms]. Sender[null] sent message of type "com.mycompany.demo.messages.boxoffice.GetEvent".
    at akka.pattern.PromiseActorRef$.$anonfun$defaultOnTimeout$1(AskSupport.scala:595)
    ... 11 common frames omitted

1 Ответ

0 голосов
/ 07 марта 2019

Что здесь не так, так это то, что в диспетчере есть блокировка.

В JVM - потоки, поддерживаемые потоками операционной системы, которые дороги как в памяти, так и в накладных расходах планировщика процессов.Одним из преимуществ Akka является то, что он позволяет более эффективно использовать потоки, позволяя запускать множество акторов на меньшем количестве потоков.

Это замечательно, но означает, что вы никогда не должны выполнять блокирующий вызоввнутри актера.Вызов CompletableFuture::join здесь блокирует, что, вероятно, является причиной вашей проблемы.

Избегая блокирования вызовов и использования асинхронных API (таких как CompletableFuture.allOf), ваша проблема должна исчезнуть.

...