Тема
Я бы хотел взаимодействовать с Akka Actor через Akka HTTP . Идея состоит в том, чтобы создать систему, в которой HTTP-клиент вызывает метод HTTP-сервера Akka, который обрабатывает запрос к Akka Actor. Актер обрабатывает сообщение и отвечает обратно вызывающей стороне (Akka HTTP), которая отвечает HTTP-клиенту.
Мне удалось сделать, как описано выше, но я думаю, что я делаю это неправильно, так как моя реализация кажется блокирующей.
Я объясняю лучше: если я делаю много одновременных HTTP-запросов, я вижу, что Akka HTTP "создает очередь", ожидая, пока субъект обработает запрос, прежде чем отправить его следующим образом.
Вместо этого я хотел бы получить следующее: HTTP-сервер Akka перенаправляет запросы, поступающие от клиентов HTTP, немедленно целевому субъекту akka, не дожидаясь, пока субъект завершит разработку.
Я хотел бы использовать параметр размера почтового ящика субъекта, чтобы определить, насколько велика очередь сообщений, и отклонить сообщения, если их слишком много.
Таким образом, мне нужен способ заставить Akka HTTP асинхронно ждать ответа актера.
Я знаю, что емкость почтового ящика работает правильно, потому что если я вместо этого сделаю много запросов своему актеру, используя простой actor2.tell ("Prova1", system.deadLetters ()) (только для тестирования ), запросы, превышающие размер почтового ящика, корректно отклоняются.
Ссылки
Для тестирования моей системы я создал простую конфигурацию, следуя минимальным примерам, предоставленным документацией akka.
Это для Акка http:
https://doc.akka.io/docs/akka-http/current/routing-dsl/index.html#minimal-example
и следующее для создания моего актера:
https://doc.akka.io/docs/akka/current/actors.html#creating-actors
Мой код
Первым делом я создал систему с одним актером (actor1), настроив akka HTTP следующим образом:
public class TestActor {
private static ActorSystem system;
public static void main(String[] args) throws InterruptedException
{
String httpBindAddress = "0.0.0.0";
int httpPort = 8086;
system = ActorSystem.create("deupnp");
ActorMaterializer materializer = ActorMaterializer.create(system);
Http http = Http.get(system);
AllDirectives app = new AllDirectives() {
};
Route routeActor = app.get(() ->
app.pathPrefix("mysuburl", () ->
app.pathPrefix(akka.http.javadsl.unmarshalling.StringUnmarshallers.STRING, actor ->
app.path(akka.http.javadsl.unmarshalling.StringUnmarshallers.STRING, message ->
app.onSuccess(() ->
CompletableFuture.supplyAsync(() -> actorFunctionCall(actor, message)), response ->
app.complete(StatusCodes.get(200), response))))));
Flow<HttpRequest, HttpResponse, NotUsed> routeFlow = app.route(routeActor).flow(system, materializer);
CompletionStage<ServerBinding> binding = http.bindAndHandle(routeFlow, ConnectHttp.toHost(httpBindAddress, httpPort), materializer);
// create system with one actor
ActorRef actor1 = system.actorOf(Props.create(ActorTest.class,"actor1").withMailbox("my-mailbox"),"actor1");
}
private static String actorFunctionCall(String actor, String message)
{
try {
Inbox inbox = Inbox.create(system);
system.actorSelection("user/"+actor).tell(message, inbox.getRef());
String response = (String) inbox.receive(Duration.create(20000, TimeUnit.SECONDS));
return response;
} catch (Exception e) {
//return new ResponseMessage(204,"Error");
e.printStackTrace();
return null;
}
}
}
где мой ActorTest следующий:
public class ActorTest extends AbstractActor {
private String myName = "";
public ActorTest(String nome){
this.myName = nome;
}
@Override
public void preStart()
{
}
@Override
public Receive createReceive() {
return receiveBuilder()
.match(String.class,
message -> {
Thread.sleep(5000l);
System.out.println(this.getClass().getName() + " >> " + myName + " >> " + message);
})
.matchAny(mex->{
System.out.println("Error");
})
.build();
}
}
мое application.conf очень просто:
akka
{
stdout-loglevel = "DEBUG"
loglevel = "DEBUG"
actor {
default-dispatcher {
throughput = 10
}
}
}
my-mailbox {
mailbox-type = "akka.dispatch.NonBlockingBoundedMailbox"
mailbox-capacity = 1
}
Ожидаемые результаты
Как видите, с mailbox -acity = 1 Я бы ожидал, что
если я делаю более 1 одновременных запросов, обрабатывается только один, а остальные отбрасываются.
Я думаю, что приведенный выше код неверен для того, что я хочу получить, поскольку я использую маршрутизацию Akka HTTP для получения HTTP-запросов на http://127.0.0.1/mysuburl/actor1/my_msg, а затем использую Входящие для отправки сообщение актеру и дождитесь ответа.
Итак, мой вопрос: как правильно связать мой HTTP-запрос Akka с моим актером Akka Actor 1 асинхронным способом?
Пожалуйста, дайте мне знать, если вам нужна дополнительная информация.
Примечание
Я даже прочитал следующую статью:
https://doc.akka.io/docs/akka-http/current/handling-blocking-operations-in-akka-http-routes.html
, который объясняет, как создать конечное число потоков для обработки нескольких запросов на блокировку, но я думаю, что это только «смягчает» последствия моего кода, который блокирует, но должен быть написан так, чтобы это не блокировки.