Akka streams kafka создает записи только при наличии следующей записи - PullRequest
0 голосов
/ 28 февраля 2019

У меня есть локальный кластер кафки только одного брокера.У меня есть два потока akka:

  • один поток периодически генерирует сообщение, содержащее текущую метку времени, и сохраняет его в теме kafka

  • Другой поток подписанк этой теме и вычисляет задержку.

Похоже, что сообщения создаются только при создании следующего сообщения в потоке.Вот задержки, которые я наблюдаю при публикации одного сообщения в две секунды (в мс):

total: 2019. Before persist: 2001, after: 18
total: 2019. Before persist: 2002, after: 17
total: 2019. Before persist: 2002, after: 17
total: 2019. Before persist: 2002, after: 17
total: 2019. Before persist: 2002, after: 17
total: 2019. Before persist: 2002, after: 17
total: 2020. Before persist: 2002, after: 18
total: 2019. Before persist: 2001, after: 18
total: 2019. Before persist: 2002, after: 17
total: 2019. Before persist: 2002, after: 17
total: 2019. Before persist: 2002, after: 17 

Если я увеличу период до пяти секунд, задержки соответственно увеличатся (+ -5000 мс).

Вот код потока создания akka:


  val producer =
    Source.fromIterator(() => Iterator.continually())
      .throttle(1, 5.second)
      .map { value =>
        val currentTime = System.currentTimeMillis()
        new ProducerRecord[ByteBuffer, ByteBuffer]("kafka-streams-input", ByteBuffer.wrap(currentTime.toString.getBytes))
      }
      .runWith(Producer.plainSink(KafkaClient.producerSettings))
...