Встроенные тесты Kafka случайно не работают - PullRequest
0 голосов
/ 16 июня 2020

Я реализовал кучу интеграционных тестов с использованием EmbededKafka для тестирования одного из наших потоковых приложений Kafka, работающих с использованием фреймворка spring-kafka. он во внутреннее хранилище состояний, выполняет некоторые преобразования и отправляет его другому микросервису в запрошенный topi c. Когда ответ возвращается в ответный topi c, он извлекает исходное сообщение из хранилища состояний и в зависимости от некоторых бизнес-логов c перенаправляет его в одну из наших подчиненных систем, каждая из которых имеет свой собственный topi c.

Интеграционные тесты просто проверяют различные варианты бизнес-условий.

Первоначально тесты были разделены на классы. При запуске сборки тесты из одного класса конфликтовали с тестами из другого класса с некоторыми конфликтными исключениями. Я не тратил на это много времени и просто перенес все тесты в один класс. Это устранило мою проблему со всеми тестами, проходящими либо из сборки gradle, либо из Intelij EDI.

Вот тест:

package au.nab.tlm.streams.integration;

import au.nab.tlm.streams.serde.EntitlementsCheckSerDes;
import au.nab.tlm.streams.test.support.MockEntitlementsCheckSerDes;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.ContextConfiguration;

@SpringBootTest
@ContextConfiguration(classes = {MyTopologiesIntegrationTest.TestKafkaConfig.class})
@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_EACH_TEST_METHOD)
@EmbeddedKafka(
        ports = 9092,
        partitions = 1,
        topics = {
                "topic-1.v1",
                "topic-2.v1",
                "topic-3.v1",
                "topic-4.v1",
                "topic-5.v1",
                "topic-6.v1",
        },
        brokerProperties = {"transaction.state.log.replication.factor=1", "transaction.state.log.min.isr=1", "log.dir=/tmp/embedded-kafka"}
)
public class MyTopologiesIntegrationTest {
    @Autowired
    EmbeddedKafkaBroker kafkaBroker;

    @Autowired
    EntitlementsCheckSerDes appSerDes;

    @Test
    public void test_1() {
    }

    @Test
    public void test_2() {
    }

    @Test
    public void test_3() {
    }

    @Test
    public void test_4() {
    }

    @Test
    public void test_5() {
    }

    @TestConfiguration
    public static class TestKafkaConfig {
        @Bean
        EntitlementsCheckSerDes appSerDes() {
            return new MockEntitlementsCheckSerDes();
        }
    }
}

Доволен результатом. Я внес изменения, просто чтобы заметить сборку. сбой на нашем сервере CI. Запустив его еще раз, он снова потерпел неудачу, на этот раз с другими сбоями, чем в первый раз. Я попросил коллегу взглянуть, и у него был такой же отказ, как и с сервером CI. Я запускал сборку как минимум двадцать раз на своей машине, и она всегда проходила. Один за другим тесты моего коллеги тоже всегда проходили. любящий или аналогичный. Все эти исключения указывали на то, что встроенный Kafka, использованный в предыдущем тесте, не был полностью отключен до начала следующего теста, несмотря на использование аннотации DirtiesContext. Самый первый запускаемый тест всегда проходил успешно.

Мы оба целый день стряхивали волосы, это было никаким способом заставить их работать. Мы пробовали что угодно и куда бы нас ни привел Google, безуспешно. В конце концов, мы оставили в тестовом классе только один тестовый сценарий (с наибольшим количеством взаимодействий) и отключили остальные. хочу понять, что мы сделали не так и как это исправить.

Заранее благодарим вас за ваш вклад.

1 Ответ

0 голосов
/ 16 июня 2020

Не использовать фиксированный порт ports = 9092, - по умолчанию встроенная кафка будет прослушивать случайный порт, выбранный операционной системой.

Вы должны использовать это для своих тестовых случаев.

Адреса брокеров можно узнать по телефону this.kafkaBroker.getBrokersAsString().

...