Я должен проверить мой код на предмет потребления всех сообщений от kafka-сервера через встроенный 'withRunningKafka', как показано здесь: https://github.com/manub/scalatest-embedded-kafka
- Я пытался отправить сообщение в тему через созданныйвстроенный производитель.
- И я пытался потреблять созданные сообщения (созданные встроенным производителем) через мой код в проекте.
«тестирование с использованием собственного производителя и потребителя» должно{
"work" in {
withRunningKafka {
1. val producer: KafkaProducer[String, String] =
aKafkaProducer[String](valueSerializer, config)
val topic = "topic-to-test"
producer.send(new ProducerRecord[String, String](topic, "some message 1"))
producer.send(new ProducerRecord[String, String](topic, "some message 2"))
producer.close()
2. val ok: Future[Done] = Consumer
.committableSource(
consumerSettings,
Subscriptions.topics(topic))
.map(msg => println(msg.record.value()))
.runWith(Sink.ignore)
ok should be (Done)
}
}}
Проблема здесь: «ОК» не дает результат как «Готово».Вообще, правильна ли моя логика для проверки потребителя?