Как протестировать Kafka Consumer с Embedded-Kafka-lib, именно с 'withRunningKafka'? - PullRequest
0 голосов
/ 31 января 2019

Я должен проверить мой код на предмет потребления всех сообщений от kafka-сервера через встроенный 'withRunningKafka', как показано здесь: https://github.com/manub/scalatest-embedded-kafka

  1. Я пытался отправить сообщение в тему через созданныйвстроенный производитель.
  2. И я пытался потреблять созданные сообщения (созданные встроенным производителем) через мой код в проекте.

«тестирование с использованием собственного производителя и потребителя» должно{

"work" in {

    withRunningKafka {

      1. val producer: KafkaProducer[String, String] =
               aKafkaProducer[String](valueSerializer, config)

         val topic = "topic-to-test"

         producer.send(new ProducerRecord[String, String](topic, "some message 1"))
         producer.send(new ProducerRecord[String, String](topic, "some message 2"))
         producer.close()

      2. val ok: Future[Done] = Consumer
        .committableSource(
            consumerSettings,
            Subscriptions.topics(topic))
        .map(msg => println(msg.record.value()))
        .runWith(Sink.ignore)

       ok should be (Done)
    }
}}

Проблема здесь: «ОК» не дает результат как «Готово».Вообще, правильна ли моя логика для проверки потребителя?

Ответы [ 2 ]

0 голосов
/ 31 января 2019

Я думаю, что вы столкнулись с двумя проблемами одновременно:

  1. Потребитель Kafka бесконечно ждет элементов (как говорит @dvim), поэтому вам нужно .take ()чтобы он действительно заканчивался

  2. Группа потребителей kafka по умолчанию будет начинаться в конце текущей темы, а не в начале, и, следовательно, не будет принимать сообщения, опубликованные до ее вращения,Вам нужно, чтобы настройки начинались в начале темы, а не в конце.

0 голосов
/ 31 января 2019

Добро пожаловать в stackoverflow!

Причина ok никогда не завершается с результатом, потому что источник ожидает возможных дальнейших сообщений.Добавьте .take(2) перед картой, и источник остановится после того, как два элемента позволят завершить ok future.

...