Akka HTTP Streaming server разрывает соединение после последнего сообщения из очереди - PullRequest
0 голосов
/ 22 февраля 2019

У меня есть довольно простое приложение, которое состоит из потребителя Kafka, сидящего за потоковым сервером Akka HTTP.После получения запроса сервер запускает нового получателя для указанного пользователя и начинает чтение сообщений из очереди:

def consumer(consumerGroup: String, from: Int) = {
  val topicsAndDate = Subscriptions.assignmentOffsetsForTimes(partitions.map(_ -> (System.currentTimeMillis() - from)): _*)

  Consumer.plainSource[String, GenericRecord](consumerSettings.withGroupId(consumerGroup), topicsAndDate)
    .map(record => record.timestamp() -> messageFormat.from(record.value()))
    .map {
      //convert to json
    }
}

def routes: Route = Route.seal(
  pathSingleSlash {
    complete(HttpEntity(ContentTypes.`text/html(UTF-8)`, "Say hello to akka-http"))
  } ~
  path("stream") {
    //some logic to validate user

    log.info("Received request from {} with 'from'={}", user, from)
    complete(consumer(user, from))  
  })

startServer("0.0.0.0", 8080)

Служба работает нормально, пока получатель не достигнет последнего сообщения в очереди.Через шестьдесят секунд после возвращения этого последнего сообщения соединение с сервером каждый раз прерывается.Я хочу сохранить соединение, так как очередь заполняется большим количеством сообщений каждые пару минут.

Я пробовал различные варианты конфигурации, но ни один из них не дал желаемого результата.Моя текущая конфигурация выглядит следующим образом:

akka {
  http {
    client {
      idle-timeout = 300s
    }
    server {
      idle-timeout = 600s
      linger-timeout = 15 min
    }
    host-connection-pool {
      max-retries = 30
      max-connections = 20
      max-open-requests = 32
      connecting-timeout = 60s
      client {
        idle-timeout = 300s
      }
    }
  }
}

Я также пытался использовать настройку server.websocket.periodic-keep-alive-max-idle = 1 second, но, похоже, это не имеет значения.

Дайте мне знать, если мне нужнопредоставить более актуальную информацию.

...