интеграция с не-потокобезопасным сервисом при сохранении противодействия в параллельной среде с использованием Futures, потоков akka и актеров akka - PullRequest
0 голосов
/ 15 февраля 2019

Я использую стороннюю библиотеку для предоставления сервисов синтаксического анализа (в моем случае - пользовательский агент), которая не является поточно-ориентированной библиотекой и должна работать на однопоточной основе.Я хотел бы написать потокобезопасный API, который может вызываться несколькими потоками для взаимодействия с ним через Futures API, поскольку библиотека может вводить некоторую потенциальную блокировку (IO).Я также хотел бы оказать обратное давление в случае необходимости и вернуть неудачное будущее, когда парсер не догонит производителей.

На самом деле это может быть общее требование / вопрос о том, как взаимодействовать с любым клиентом /библиотека, которая не является поточно-ориентированной (пользовательские агенты / парсеры географического местоположения, клиенты БД, такие как redis, коллекторы регистраторов, такие как fluentd), с обратным давлением в параллельных средах.

Я придумал следующую формулу:

  1. инкапсулирует анализатор в выделенном Actor.

  2. создает исходную очередь потока akka, которая получает ParseReuqest, который содержит пользовательский агент и Promise для завершения, ииспользуя шаблон запроса через mapAsync для взаимодействия с субъектом синтаксического анализа.

  3. создайте другого субъекта для инкапсуляции исходной очереди.

Это способидти?Есть ли другой способ добиться этого, может быть, проще?может быть, с использованием графа стадии?это может быть сделано без шаблона аска и меньшего количества вовлеченного кода?

актер, упомянутый в номере 3, потому что я не уверен, является ли исходная очередь потокобезопасной или нет ??Я хотел бы, чтобы это было просто указано в документации, но это не так.Есть несколько версий в Интернете, некоторые утверждают, что это не так, а некоторые заявляют, что это так.

Является ли исходная очередь после материализации потокобезопасной для выталкивания элементов из разных потоков?

(код может не скомпилироваться и подвержен потенциальным сбоям; он предназначен только для решения этого вопроса)

class UserAgentRepo(dbFilePath: String)(implicit actorRefFactory: ActorRefFactory) {

import akka.pattern.ask
import akka.util.Timeout
import scala.concurrent.duration._
implicit val askTimeout = Timeout(5 seconds)

// API to parser - delegates the request to the back pressure actor
def parse(userAgent: String): Future[Option[UserAgentData]] = {
  val p = Promise[Option[UserAgentData]]
  parserBackPressureProvider ! UserAgentParseRequest(userAgent, p)
  p.future
}

// Actor to provide back pressure that delegates requests to parser actor
private class ParserBackPressureProvider extends Actor {
  private val parser = context.actorOf(Props[UserAgentParserActor])

  val queue = Source.queue[UserAgentParseRequest](100, OverflowStrategy.dropNew)
    .mapAsync(1)(request => (parser ? request.userAgent).mapTo[Option[UserAgentData]].map(_ -> request.p))
    .to(Sink.foreach({
      case (result, promise) => promise.success(result)
    }))
    .run()

  override def receive: Receive = {
    case request: UserAgentParseRequest => queue.offer(request).map {
      case QueueOfferResult.Enqueued =>
      case _ => request.p.failure(new RuntimeException("parser busy"))
    }
  }
}

// Actor parser
private class UserAgentParserActor extends Actor {
  private val up = new UserAgentParser(dbFilePath, true, 50000)
  override def receive: Receive = {
    case userAgent: String =>
      sender ! Try {
      up.parseUa(userAgent)
    }.toOption.map(UserAgentData(userAgent, _))
  }
}

private case class UserAgentParseRequest(userAgent: String, p: Promise[Option[UserAgentData]])

private val parserBackPressureProvider = actorRefFactory.actorOf(Props[ParserBackPressureProvider])

}

1 Ответ

0 голосов
/ 15 февраля 2019

У вас есть , чтобы использовать актеров для этого?

Не похоже, что вам нужна вся эта сложность, scala / java имеет все необходимые инструменты "из коробки":

 class ParserFacade(parser: UserAgentParser, val capacity: Int = 100) {
    private implicit val ec = ExecutionContext
      .fromExecutor(
         new ThreadPoolExecutor(
           1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(capacity)
         )
      )

   def parse(ua: String): Future[Option[UserAgentData]] = try {
     Future(Some(UserAgentData(ua, parser.parseUa(ua)))
       .recover { _ => None }
   } catch {
     case _: RejectedExecutionException => 
       Future.failed(new RuntimeException("parser is busy"))
   }
}
...