Как взаимодействовать с Akka Actors через Akka HTTP (Java) - PullRequest
0 голосов
/ 19 января 2019

Тема

Я бы хотел взаимодействовать с Akka Actor через Akka HTTP . Идея состоит в том, чтобы создать систему, в которой HTTP-клиент вызывает метод HTTP-сервера Akka, который обрабатывает запрос к Akka Actor. Актер обрабатывает сообщение и отвечает обратно вызывающей стороне (Akka HTTP), которая отвечает HTTP-клиенту. Мне удалось сделать, как описано выше, но я думаю, что я делаю это неправильно, так как моя реализация кажется блокирующей.

Я объясняю лучше: если я делаю много одновременных HTTP-запросов, я вижу, что Akka HTTP "создает очередь", ожидая, пока субъект обработает запрос, прежде чем отправить его следующим образом.

Вместо этого я хотел бы получить следующее: HTTP-сервер Akka перенаправляет запросы, поступающие от клиентов HTTP, немедленно целевому субъекту akka, не дожидаясь, пока субъект завершит разработку. Я хотел бы использовать параметр размера почтового ящика субъекта, чтобы определить, насколько велика очередь сообщений, и отклонить сообщения, если их слишком много.

Таким образом, мне нужен способ заставить Akka HTTP асинхронно ждать ответа актера.

Я знаю, что емкость почтового ящика работает правильно, потому что если я вместо этого сделаю много запросов своему актеру, используя простой actor2.tell ("Prova1", system.deadLetters ()) (только для тестирования ), запросы, превышающие размер почтового ящика, корректно отклоняются.


Ссылки

Для тестирования моей системы я создал простую конфигурацию, следуя минимальным примерам, предоставленным документацией akka. Это для Акка http: https://doc.akka.io/docs/akka-http/current/routing-dsl/index.html#minimal-example

и следующее для создания моего актера: https://doc.akka.io/docs/akka/current/actors.html#creating-actors


Мой код

Первым делом я создал систему с одним актером (actor1), настроив akka HTTP следующим образом:

public class TestActor {

    private static ActorSystem system;

    public static void main(String[] args) throws InterruptedException
    {
        String httpBindAddress = "0.0.0.0";
        int httpPort = 8086;        
        system = ActorSystem.create("deupnp");
        ActorMaterializer materializer = ActorMaterializer.create(system);      
        Http http = Http.get(system);
        AllDirectives app = new AllDirectives() {           
        };

        Route routeActor =  app.get(() ->
        app.pathPrefix("mysuburl", () -> 
        app.pathPrefix(akka.http.javadsl.unmarshalling.StringUnmarshallers.STRING, actor -> 
        app.path(akka.http.javadsl.unmarshalling.StringUnmarshallers.STRING, message -> 
        app.onSuccess(() -> 
        CompletableFuture.supplyAsync(() -> actorFunctionCall(actor, message)), response -> 
        app.complete(StatusCodes.get(200), response))))));

        Flow<HttpRequest, HttpResponse, NotUsed> routeFlow = app.route(routeActor).flow(system, materializer);
        CompletionStage<ServerBinding> binding = http.bindAndHandle(routeFlow, ConnectHttp.toHost(httpBindAddress, httpPort), materializer);

        // create system with one actor
        ActorRef actor1 = system.actorOf(Props.create(ActorTest.class,"actor1").withMailbox("my-mailbox"),"actor1");    
    }

    private static String actorFunctionCall(String actor, String message)
    {
        try {
            Inbox inbox = Inbox.create(system);
            system.actorSelection("user/"+actor).tell(message, inbox.getRef());
            String response  = (String) inbox.receive(Duration.create(20000, TimeUnit.SECONDS));
            return response;
        } catch (Exception e) {
            //return new ResponseMessage(204,"Error");
            e.printStackTrace();
            return null;
        }
    }
}

где мой ActorTest следующий:

public class ActorTest extends AbstractActor {

    private String myName = ""; 

    public ActorTest(String nome){
        this.myName = nome;
    }

    @Override
    public void preStart()
    {
    }

    @Override
    public Receive createReceive() {        
        return receiveBuilder()
            .match(String.class, 
                message -> {
                Thread.sleep(5000l);
                System.out.println(this.getClass().getName() + " >> " + myName + " >> " + message);
            })
            .matchAny(mex->{
                System.out.println("Error");
            })
            .build();
    }   
}

мое application.conf очень просто:

akka
{
    stdout-loglevel = "DEBUG"
    loglevel = "DEBUG"
    actor {
        default-dispatcher {
            throughput = 10
        }
    }
}

my-mailbox {
    mailbox-type = "akka.dispatch.NonBlockingBoundedMailbox"
    mailbox-capacity = 1
}


Ожидаемые результаты

Как видите, с mailbox -acity = 1 Я бы ожидал, что если я делаю более 1 одновременных запросов, обрабатывается только один, а остальные отбрасываются.

Я думаю, что приведенный выше код неверен для того, что я хочу получить, поскольку я использую маршрутизацию Akka HTTP для получения HTTP-запросов на http://127.0.0.1/mysuburl/actor1/my_msg, а затем использую Входящие для отправки сообщение актеру и дождитесь ответа.

Итак, мой вопрос: как правильно связать мой HTTP-запрос Akka с моим актером Akka Actor 1 асинхронным способом?

Пожалуйста, дайте мне знать, если вам нужна дополнительная информация.

Примечание

Я даже прочитал следующую статью: https://doc.akka.io/docs/akka-http/current/handling-blocking-operations-in-akka-http-routes.html

, который объясняет, как создать конечное число потоков для обработки нескольких запросов на блокировку, но я думаю, что это только «смягчает» последствия моего кода, который блокирует, но должен быть написан так, чтобы это не блокировки.

...