У меня есть локальный кластер кафки только одного брокера.У меня есть два потока akka:
один поток периодически генерирует сообщение, содержащее текущую метку времени, и сохраняет его в теме kafka
Другой поток подписанк этой теме и вычисляет задержку.
Похоже, что сообщения создаются только при создании следующего сообщения в потоке.Вот задержки, которые я наблюдаю при публикации одного сообщения в две секунды (в мс):
total: 2019. Before persist: 2001, after: 18
total: 2019. Before persist: 2002, after: 17
total: 2019. Before persist: 2002, after: 17
total: 2019. Before persist: 2002, after: 17
total: 2019. Before persist: 2002, after: 17
total: 2019. Before persist: 2002, after: 17
total: 2020. Before persist: 2002, after: 18
total: 2019. Before persist: 2001, after: 18
total: 2019. Before persist: 2002, after: 17
total: 2019. Before persist: 2002, after: 17
total: 2019. Before persist: 2002, after: 17
Если я увеличу период до пяти секунд, задержки соответственно увеличатся (+ -5000 мс).
Вот код потока создания akka:
val producer =
Source.fromIterator(() => Iterator.continually())
.throttle(1, 5.second)
.map { value =>
val currentTime = System.currentTimeMillis()
new ProducerRecord[ByteBuffer, ByteBuffer]("kafka-streams-input", ByteBuffer.wrap(currentTime.toString.getBytes))
}
.runWith(Producer.plainSink(KafkaClient.producerSettings))