Akka streams Source.repeat останавливается после 100 запросов - PullRequest
0 голосов
/ 08 января 2019

Я работаю над приведенной ниже системой обработки потоков, чтобы получать кадры из одного источника, обрабатывать и отправлять в другой. Я использую комбинацию akka-streams и akka-http через их scapa api. Конвейер очень короткий, но я не могу определить, где система решает остановиться после 100 запросов к конечной точке.

object frameProcessor extends App {
  implicit val system: ActorSystem = ActorSystem("VideoStreamProcessor")
  val decider: Supervision.Decider = _ => Supervision.Restart
  implicit val materializer: ActorMaterializer = ActorMaterializer()
  implicit val dispatcher: ExecutionContextExecutor = system.dispatcher
  val http = Http(system)
  val sourceConnectionFlow: Flow[HttpRequest, HttpResponse, Future[Http.OutgoingConnection]] = http.outgoingConnection(sourceUri)

  val byteFlow: Flow[HttpResponse, Future[ByteString], NotUsed] =
    Flow[HttpResponse].map(_.entity.dataBytes.runFold(ByteString.empty)(_ ++ _))

  Source.repeat(HttpRequest(uri = sourceUri))
    .via(sourceConnectionFlow)
    .via(byteFlow)
    .map(postFrame)
    .runWith(Sink.ignore)
    .onComplete(_ => system.terminate())

  def postFrame(imageBytes: Future[ByteString]): Unit = {
    imageBytes.onComplete{
      case Success(res) => system.log.info(s"post frame. ${res.length} bytes")
      case Failure(_) => system.log.error("failed to post image!")
    }
  }
}

Для справки, я использую akka-streams версию 2.5.19 и akka-http версию 10.1.7 . Нет ошибок, нет кодов ошибок на исходном сервере, откуда приходят кадры, и программа завершается с кодом ошибки 0.

Мой application.conf выглядит следующим образом:

logging = "DEBUG"

Всегда обрабатывается 100 единиц.

Спасибо!

Редактировать

Добавлена ​​регистрация в потоке, вот так

.onComplete{
  case Success(res) => {
    system.log.info(res.toString)
    system.terminate()
  }
  case Failure(res) => {
    system.log.error(res.getMessage)
    system.terminate()
  }

}

Получено исключение сброса соединения, но это не соответствует. Поток завершается с Done.

Редактировать 2

Используя .mapAsync(1)(postFrame) я получаю тот же Success(Done) после 100 запросов. Кроме того, когда я проверяю сервер nginx access.log и error.log, появляются только 200 ответы.

Мне пришлось изменить postFrame следующим образом для запуска mapAsync

def postFrame(imageBytes: Future[ByteString]): Future[Unit] = {
  imageBytes.onComplete{
    case Success(res) => system.log.info(s"post frame. ${res.length} bytes")
    case Failure(_) => system.log.error("failed to post image!")
  }
  Future(Unit)
}

1 Ответ

0 голосов
/ 09 января 2019

Мне кажется, я нашел ответ на Документах Akka с использованием отложенного перезапуска с оператором отката . Вместо того, чтобы получать данные напрямую из нестабильного удаленного соединения, я использую RestartSource.withBackoff и , а не RestartSource.onFailureWithBackoff. Модифицированный поток выглядит так:

val restartSource = RestartSource.withBackoff(
  minBackoff = 100.milliseconds,
  maxBackoff = 1.seconds,
  randomFactor = 0.2
){ () =>
  Source.single(HttpRequest(uri = sourceUri))
    .via(sourceConnectionFlow)
    .via(byteFlow)
    .mapAsync(1)(postFrame)
}
restartSource
  .runWith(Sink.ignore)
  .onComplete{
    x => {
      println(x)
      system.terminate()
    }
 } 

Мне не удалось найти источник проблемы, но, похоже, это сработает.

...