Получить актера Akka или создать его, если он не существует - PullRequest
5 голосов
/ 18 марта 2019

Я занимаюсь разработкой приложения, которое создает актеров Akka для управления и обработки сообщений, приходящих на тему Kafka. Сообщения с одинаковым ключом обрабатываются одним и тем же субъектом. Я также использую ключ сообщения, чтобы назвать соответствующего актера.

Когда из темы читается новое сообщение, я не знаю, был ли актер с идентификатором, равным ключу сообщения, уже создан системой субъекта или нет. Поэтому я пытаюсь разрешить актера, используя его имя, и, если он еще не существует, я создаю его. Мне нужно управлять параллелизмом в отношении разрешения актера. Таким образом, возможно, что более чем один клиент спрашивает систему акторов , существует ли актер.

Код, который я сейчас использую, следующий:

private CompletableFuture<ActorRef> getActor(String uuid) {
    return system.actorSelection(String.format("/user/%s", uuid))
                 .resolveOne(Duration.ofMillis(1000))
                 .toCompletableFuture()
                 .exceptionally(ex -> 
                     system.actorOf(Props.create(MyActor.class, uuid), uuid))
                 .exceptionally(ex -> {
                     try {
                         return system.actorSelection(String.format("/user/%s",uuid)).resolveOne(Duration.ofMillis(1000)).toCompletableFuture().get();
                     } catch (InterruptedException | ExecutionException e) {
                         throw new RuntimeException(e);
                     }
                 });
}

Приведенный выше код не оптимизирован, и обработка исключений может быть улучшена.

Однако, есть ли в Akka более идиоматический способ разрешения актера или его создания, если он не существует? Я что-то упустил?

Ответы [ 3 ]

4 голосов
/ 19 марта 2019

Рассмотрите возможность создания субъекта, который в качестве своего состояния поддерживает отображение идентификаторов сообщений на ActorRef s. Этот актер "регистратор" будет обрабатывать все запросы для получения актера обработки сообщений. Когда администратор получает запрос на актера (запрос будет включать в себя идентификатор сообщения), он пытается найти ассоциированного актера на своей карте: если такой актер найден, он возвращает ActorRef отправителю; в противном случае он создает нового субъекта обработки, добавляет его в свою карту и возвращает ссылку на этот субъект отправителю.

1 голос
/ 24 марта 2019

Я бы рассмотрел использование akka-cluster и akka-cluster-sharding. Во-первых, это дает вам пропускную способность , а также надежность. Однако это также заставит систему управлять созданием субъектов «сущности».

Но вы должны изменить способ общения с этими актерами. Вы создаете ShardRegion актер, который обрабатывает все сообщения:

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.cluster.sharding.ClusterSharding;
import akka.cluster.sharding.ClusterShardingSettings;
import akka.cluster.sharding.ShardRegion;
import akka.event.Logging;
import akka.event.LoggingAdapter;


public class MyEventReceiver extends AbstractActor {

    private final ActorRef shardRegion;

    public static Props props() {
        return Props.create(MyEventReceiver.class, MyEventReceiver::new);
    }

    static ShardRegion.MessageExtractor messageExtractor 
      = new ShardRegion.HashCodeMessageExtractor(100) {
            // using the supplied hash code extractor to shard
            // the actors based on the hashcode of the entityid

        @Override
        public String entityId(Object message) {
            if (message instanceof EventInput) {
                return ((EventInput) message).uuid().toString();
            }
            return null;
        }

        @Override
        public Object entityMessage(Object message) {
            if (message instanceof EventInput) {
                return message;
            }
            return message; // I don't know why they do this it's in the sample
        }
    };


    public MyEventReceiver() {
        ActorSystem system = getContext().getSystem();
        ClusterShardingSettings settings =
           ClusterShardingSettings.create(system);
        // this is setup for the money shot
        shardRegion = ClusterSharding.get(system)
                .start("EventShardingSytem",
                        Props.create(EventActor.class),
                        settings,
                        messageExtractor);
    }

    @Override
    public Receive createReceive() {
        return receiveBuilder().match(
                EventInput.class,
                e -> {
                    log.info("Got an event with UUID {} forwarding ... ",
                            e.uuid());
                    // the money shot
                    deviceRegion.tell(e, getSender());
                }
        ).build();
    }
}

Итак, этот Actor MyEventReceiver работает на всех узлах вашего кластера и инкапсулирует shardRegion Actor. Вы больше не отправляете сообщения своим EventActor напрямую, но, используя акторы MyEventReceiver и deviceRegion, вы используете систему шардинга, которая отслеживает, в каком узле кластера живет определенный EventActor , Он создаст его, если он еще не был создан, или перенаправит сообщения, если он был создан. Каждый EventActor должен иметь уникальный идентификатор: который извлекается из сообщения (так что UUID довольно хорош для этого, но это может быть какой-то другой идентификатор, такой как customerID или orderID, или что-то еще, если он уникален для экземпляра Actor, с которым вы хотите его обработать).

(я опускаю код EventActor, в остальном это довольно обычный актер, в зависимости от того, что вы делаете с ним, «магия» в приведенном выше коде).

Система шардинга автоматически знает, как создать EventActor и присвоить его шарду на основе выбранного вами алгоритма (в данном конкретном случае она основана на hashCode уникального идентификатора, который все Я когда-либо использовал). Кроме того, вы гарантировано только один Актер для любого данного уникального идентификатора. Сообщение прозрачно направляется на правильный узел и осколок, где бы он ни находился; от того узла и осколка, куда он отправляется.

Больше информации и примеров кода на сайте Akka и в документации.

Это довольно приятный способ убедиться, что один и тот же объект / субъект всегда обрабатывает сообщения, предназначенные для него. Кластер и сегментирование автоматически заботятся о правильном распределении акторов, об отказе и т. Д. (Вам нужно было бы добавить akka-persistence, чтобы получить пассивацию, регидратацию и отработку отказа, если с действующим субъектом связано несколько строгих состояний (что должно быть восстановленным)).

1 голос
/ 19 марта 2019

Ответ Джеффри Чанга действительно относится к Акке. Недостатком такого подхода является его низкая производительность. Наиболее эффективное решение - использовать метод Java ConcurrentHashMap.computeIfAbsent () .

...