У меня есть источник Akka Stream Kafka, который читает тему Kafka.
У меня есть простая задача, которая позволяет отключить фиксацию смещения сообщения.Фиксация обычно выполняется с помощью commitScaladsl.
Моя проблема в том, что я не знаю, как проверить, было ли смещение зафиксировано или нет.
Обычно мы используем EmbeddedKafka для тестирования, но у меня нетне нашел способ узнать последнее зафиксированное смещение.
Это пример написанного мною теста:
"KafkaSource" should {
"consume from a kafka topic and pass the message " in {
val commitToKafka = true
val key = "key".getBytes
val message = "message".getBytes
withRunningKafka {
val source = getKafkaSource(commitToKafka)
val (_, sub) = source
.toMat(TestSink.probe[CommittableMessage[Array[Byte], Array[Byte], ConsumerMessage.CommittableOffset]])(Keep.both)
.run()
val messageOpt = publishAndRequestRetry(topic, key, message, sub, retries)
messageOpt should not be empty
messageOpt.get.value shouldBe message
}
}
Теперь я хочу добавить проверку на смещениебыть совершенным или нет.