У меня есть довольно простое приложение, которое состоит из потребителя Kafka, сидящего за потоковым сервером Akka HTTP.После получения запроса сервер запускает нового получателя для указанного пользователя и начинает чтение сообщений из очереди:
def consumer(consumerGroup: String, from: Int) = {
val topicsAndDate = Subscriptions.assignmentOffsetsForTimes(partitions.map(_ -> (System.currentTimeMillis() - from)): _*)
Consumer.plainSource[String, GenericRecord](consumerSettings.withGroupId(consumerGroup), topicsAndDate)
.map(record => record.timestamp() -> messageFormat.from(record.value()))
.map {
//convert to json
}
}
def routes: Route = Route.seal(
pathSingleSlash {
complete(HttpEntity(ContentTypes.`text/html(UTF-8)`, "Say hello to akka-http"))
} ~
path("stream") {
//some logic to validate user
log.info("Received request from {} with 'from'={}", user, from)
complete(consumer(user, from))
})
startServer("0.0.0.0", 8080)
Служба работает нормально, пока получатель не достигнет последнего сообщения в очереди.Через шестьдесят секунд после возвращения этого последнего сообщения соединение с сервером каждый раз прерывается.Я хочу сохранить соединение, так как очередь заполняется большим количеством сообщений каждые пару минут.
Я пробовал различные варианты конфигурации, но ни один из них не дал желаемого результата.Моя текущая конфигурация выглядит следующим образом:
akka {
http {
client {
idle-timeout = 300s
}
server {
idle-timeout = 600s
linger-timeout = 15 min
}
host-connection-pool {
max-retries = 30
max-connections = 20
max-open-requests = 32
connecting-timeout = 60s
client {
idle-timeout = 300s
}
}
}
}
Я также пытался использовать настройку server.websocket.periodic-keep-alive-max-idle = 1 second
, но, похоже, это не имеет значения.
Дайте мне знать, если мне нужнопредоставить более актуальную информацию.