Akka - лучший подход для настройки Actor, который использует службу REST API (операция блокировки) - PullRequest
1 голос
/ 14 июня 2019

У меня есть механизм обмена сообщениями Akka, который доставляет миллионы сообщений в течение дня, как по SMS, так и по электронной почте.Мне нужно представить новый тип обмена сообщениями (PushNotification), который заключается в том, что каждый запрос потребляет REST API (он также обрабатывает миллионы).Я считаю, что использование Web-сервиса является блокирующей операцией, поэтому из того, что я прочитал, мне нужно добавить отдельный диспетчер для этого нового Actor, мои вопросы таковы: обязательно ли он должен быть исполнителем пула потоков с фиксированным пуломразмер, как упомянуто здесь?(См. https://doc.akka.io/docs/akka-http/current/handling-blocking-operations-in-akka-http-routes.html) или вместо этого можно использовать fork-join-executor? Кроме того, каков наилучший подход, чтобы не влиять на текущие 2 типа сообщений? (SMS и EMAIL) Я имею в виду, как я могучтобы избежать истощения их пула потоков? В настоящее время EMAIL использует отдельный диспетчер, а SMS использует диспетчер по умолчанию. Вместо создания нового диспетчера для субъекта с операцией блокировки (вызова WebService) есть ли другой способ? Например, создание реактивного вебасервис?

1 Ответ

2 голосов
/ 14 июня 2019

Использование RESTful API из веб-службы не обязательно должно блокировать.

Простой способ использовать RESTful API у субъекта - использовать Akka HTTP Client .Это позволяет вам отправлять HTTP-запрос и отправлять результат в виде сообщения актеру, используя метод pipeTo.

Это очень урезанный пример (слегка измененный из примера в документации).

import akka.http.scaladsl.Http

object RestWorker {
  def props(replyTo: ActorRef): Props =
    Props(new RestWorker(replyTo))
}

class RestWorker(replyTo: ActorRef) extends Actor
{
  implicit val ec: ExecutionContext = context.system.dispatcher

  override def preStart() = {
    Http(context.system).singleRequest(HttpRequest(uri = "https://1.2.3.4/resource"))
      .pipeTo(self)
  }

  def receive = {
    case resp: HttpResponse =>

      val response = ??? // Process response

      replyTo ! response

      self ! PoisonPill
  }
}
...