Я работаю над приведенной ниже системой обработки потоков, чтобы получать кадры из одного источника, обрабатывать и отправлять в другой. Я использую комбинацию akka-streams
и akka-http
через их scapa api. Конвейер очень короткий, но я не могу определить, где система решает остановиться после 100 запросов к конечной точке.
object frameProcessor extends App {
implicit val system: ActorSystem = ActorSystem("VideoStreamProcessor")
val decider: Supervision.Decider = _ => Supervision.Restart
implicit val materializer: ActorMaterializer = ActorMaterializer()
implicit val dispatcher: ExecutionContextExecutor = system.dispatcher
val http = Http(system)
val sourceConnectionFlow: Flow[HttpRequest, HttpResponse, Future[Http.OutgoingConnection]] = http.outgoingConnection(sourceUri)
val byteFlow: Flow[HttpResponse, Future[ByteString], NotUsed] =
Flow[HttpResponse].map(_.entity.dataBytes.runFold(ByteString.empty)(_ ++ _))
Source.repeat(HttpRequest(uri = sourceUri))
.via(sourceConnectionFlow)
.via(byteFlow)
.map(postFrame)
.runWith(Sink.ignore)
.onComplete(_ => system.terminate())
def postFrame(imageBytes: Future[ByteString]): Unit = {
imageBytes.onComplete{
case Success(res) => system.log.info(s"post frame. ${res.length} bytes")
case Failure(_) => system.log.error("failed to post image!")
}
}
}
Для справки, я использую akka-streams
версию 2.5.19 и akka-http
версию 10.1.7 . Нет ошибок, нет кодов ошибок на исходном сервере, откуда приходят кадры, и программа завершается с кодом ошибки 0.
Мой application.conf
выглядит следующим образом:
logging = "DEBUG"
Всегда обрабатывается 100 единиц.
Спасибо!
Редактировать
Добавлена регистрация в потоке, вот так
.onComplete{
case Success(res) => {
system.log.info(res.toString)
system.terminate()
}
case Failure(res) => {
system.log.error(res.getMessage)
system.terminate()
}
}
Получено исключение сброса соединения, но это не соответствует. Поток завершается с Done
.
Редактировать 2
Используя .mapAsync(1)(postFrame)
я получаю тот же Success(Done)
после 100 запросов. Кроме того, когда я проверяю сервер nginx access.log
и error.log
, появляются только 200
ответы.
Мне пришлось изменить postFrame
следующим образом для запуска mapAsync
def postFrame(imageBytes: Future[ByteString]): Future[Unit] = {
imageBytes.onComplete{
case Success(res) => system.log.info(s"post frame. ${res.length} bytes")
case Failure(_) => system.log.error("failed to post image!")
}
Future(Unit)
}