Я использую стороннюю библиотеку для предоставления сервисов синтаксического анализа (в моем случае - пользовательский агент), которая не является поточно-ориентированной библиотекой и должна работать на однопоточной основе.Я хотел бы написать потокобезопасный API, который может вызываться несколькими потоками для взаимодействия с ним через Futures API, поскольку библиотека может вводить некоторую потенциальную блокировку (IO).Я также хотел бы оказать обратное давление в случае необходимости и вернуть неудачное будущее, когда парсер не догонит производителей.
На самом деле это может быть общее требование / вопрос о том, как взаимодействовать с любым клиентом /библиотека, которая не является поточно-ориентированной (пользовательские агенты / парсеры географического местоположения, клиенты БД, такие как redis, коллекторы регистраторов, такие как fluentd), с обратным давлением в параллельных средах.
Я придумал следующую формулу:
инкапсулирует анализатор в выделенном Actor.
создает исходную очередь потока akka, которая получает ParseReuqest, который содержит пользовательский агент и Promise для завершения, ииспользуя шаблон запроса через mapAsync для взаимодействия с субъектом синтаксического анализа.
создайте другого субъекта для инкапсуляции исходной очереди.
Это способидти?Есть ли другой способ добиться этого, может быть, проще?может быть, с использованием графа стадии?это может быть сделано без шаблона аска и меньшего количества вовлеченного кода?
актер, упомянутый в номере 3, потому что я не уверен, является ли исходная очередь потокобезопасной или нет ??Я хотел бы, чтобы это было просто указано в документации, но это не так.Есть несколько версий в Интернете, некоторые утверждают, что это не так, а некоторые заявляют, что это так.
Является ли исходная очередь после материализации потокобезопасной для выталкивания элементов из разных потоков?
(код может не скомпилироваться и подвержен потенциальным сбоям; он предназначен только для решения этого вопроса)
class UserAgentRepo(dbFilePath: String)(implicit actorRefFactory: ActorRefFactory) {
import akka.pattern.ask
import akka.util.Timeout
import scala.concurrent.duration._
implicit val askTimeout = Timeout(5 seconds)
// API to parser - delegates the request to the back pressure actor
def parse(userAgent: String): Future[Option[UserAgentData]] = {
val p = Promise[Option[UserAgentData]]
parserBackPressureProvider ! UserAgentParseRequest(userAgent, p)
p.future
}
// Actor to provide back pressure that delegates requests to parser actor
private class ParserBackPressureProvider extends Actor {
private val parser = context.actorOf(Props[UserAgentParserActor])
val queue = Source.queue[UserAgentParseRequest](100, OverflowStrategy.dropNew)
.mapAsync(1)(request => (parser ? request.userAgent).mapTo[Option[UserAgentData]].map(_ -> request.p))
.to(Sink.foreach({
case (result, promise) => promise.success(result)
}))
.run()
override def receive: Receive = {
case request: UserAgentParseRequest => queue.offer(request).map {
case QueueOfferResult.Enqueued =>
case _ => request.p.failure(new RuntimeException("parser busy"))
}
}
}
// Actor parser
private class UserAgentParserActor extends Actor {
private val up = new UserAgentParser(dbFilePath, true, 50000)
override def receive: Receive = {
case userAgent: String =>
sender ! Try {
up.parseUa(userAgent)
}.toOption.map(UserAgentData(userAgent, _))
}
}
private case class UserAgentParseRequest(userAgent: String, p: Promise[Option[UserAgentData]])
private val parserBackPressureProvider = actorRefFactory.actorOf(Props[ParserBackPressureProvider])
}