Кафка для теста в контейнерах - PullRequest
0 голосов
/ 24 декабря 2018

Интегрированная кафка в мой весенний проект.Написали интеграцию с использованием подхода TestContainer, но время от времени тесты не дают результатов.Похоже, некоторые проблемы с инициализацией сервера Кафка.

Вот мой код ниже

def setupSpec() {
    kafka = new KafkaContainer()
    kafka.start()
    System.setProperty("kafka.consumer.endpoint", kafka.bootstrapServers.replace("PLAINTEXT://", ""))
}


def setup() {
    RestAssured.port = port
}

def "test profile update events"() {

    given:
    String INPUT_TOPIC = "EventXX"

    when:
    KafkaProducer<String, String> kafkaProducer = createProducer()
    kafkaProducer.send(new ProducerRecord<>(INPUT_TOPIC, "foo")).get().topic()
    kafkaProducer.send(new ProducerRecord<>(INPUT_TOPIC, "foo")).get().topic()
    kafkaProducer.send(new ProducerRecord<>(INPUT_TOPIC, "foo")).get().topic()
    kafkaProducer.send(new ProducerRecord<>(INPUT_TOPIC, "foo")).get().topic()

    then:
    TestUtil.waitFor({ EventConsumer.msgConsumed == true }, 5000)
    kafkaProducer.close()

}

Теперь интересно, если я добавлю Thread.sleep (10000) перед отправкой сообщения в тестеэто всегда работает, но такой подход мне кажется немного грязным.Как мы можем убедиться, что сервер kafka запущен и запущен перед выполнением любого теста.Я попробовал следующий подход, проверив kafkaSendRecieve в setupSpec , но не получилось.Я вставляю код ниже

def validatekafkaSendRecieve() {
    def started = false
    String INPUT_TOPIC = "kafkaTest"
    def producer = createProducer()
    def consumer = createConsumer(INPUT_TOPIC)
    Thread.sleep(9000)

    while (!started) {
        producer.send(new ProducerRecord<>(INPUT_TOPIC, "foo")).get()
        started = consumeMessage(INPUT_TOPIC, consumer)
    }
    producer.close()
    consumer.close()
}

def consumeMessage(String topic, KafkaConsumer kafkaConsumer) {
    def message = kafkaConsumer.poll(3)
    if (!message.isEmpty()) {
        def messageList = message.records(topic).asList()
        if (messageList != null && !message.isEmpty()) {
            return true
        }
    } else {
        return false
    }
}

1 Ответ

0 голосов
/ 20 марта 2019

Вы предварительно создаете тему, по которой вы производите?Попробуйте сделать это.Не уверен, что это проблема, с которой вы столкнулись (необходимы журналы), но когда тема создается автоматически, для всех ее разделов требуется некоторое время, чтобы назначить лидера.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...