Spring Kafka Embedded - topi c уже существует между тестами - PullRequest
0 голосов
/ 23 января 2020

Я создал группу тестов (JUnit 5) со встроенным kafka (spring-kafka-test), и когда я запускаю их иногда (не всегда), я получаю " Topi c 'some_name' уже существует"на один или несколько тестов за один прогон.

Все тесты используют одно и то же имя topi c (я не хочу менять это имя для каждого теста), класс теста имеет аннотацию DirtiesContext (AFTER_EACH_TEST_METHOD). Я не уверен, что является причиной этой проблемы и как ее решить.

@SpringBootTest
@EmbeddedKafka(partitions = 1, topics = {"some_name"}, ports = 9092)
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
@ActiveProfiles("test")
public class RemovalKafkaTestIT {
    private EmbeddedKafkaBroker embeddedKafkaBroker = new EmbeddedKafkaBroker(1, true, TOPIC);
    private final static String SERVER_ADDRES = "127.0.0.1:9092";

    private Consumer<String, String> prepareConsumer() {
        Map<String, Object> configsConsumer = new HashMap<>(KafkaTestUtils.consumerProps("consumer", "false", embeddedKafkaBroker));
        configsConsumer.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        configsConsumer.put("bootstrap.servers", SERVER_ADDRES);
        Consumer<String, String> consumer = new DefaultKafkaConsumerFactory<>(configsConsumer, new StringDeserializer(), new StringDeserializer()).createConsumer();
        consumer.subscribe(singleton("some_name"));
        return consumer;
    }

    @Test
    public void someMethodWithKafka1() {
        // some logic
        ...
        // check topic content 
        Consumer<String, String> consumer = this.prepareConsumer();
        embeddedKafkaBroker.consumeFromEmbeddedTopics(consumer, "some_name");

        ConsumerRecords<String, String> records = KafkaTestUtils.getRecords(consumer);
        assertThat(records.count()).isEqualTo(1); // and other checks :)

        // clean
        consumer.commitSync();
        consumer.close();
    }

    @Test
    public void someMethodWithKafka2() {
        // some other logic
        ...
        // check topic content 
        Consumer<String, String> consumer = this.prepareConsumer();
        embeddedKafkaBroker.consumeFromEmbeddedTopics(consumer, "some_name");

        ConsumerRecords<String, String> records = KafkaTestUtils.getRecords(consumer);
        assertThat(records.count()).isEqualTo(1); // and other checks :)

        // clean
        consumer.commitSync();
        consumer.close();
    }
}

Ответы [ 2 ]

1 голос
/ 23 января 2020

У вас есть два брокера; один, который вы создаете сами:

private EmbeddedKafkaBroker embeddedKafkaBroker = new EmbeddedKafkaBroker(1, true, TOPIC);

и один, управляемый Spring:

@SpringBootTest
@EmbeddedKafka(partitions = 1, topics = {"some_name"}, ports = 9092)

Когда вы используете @EmbeddedKafka с контекстом теста Spring; брокер добавляется в контекст.

Измените его на

@Autowired
private EmbeddedKafkaBroker embeddedKafkaBroker;

и не добавляйте другой компонент.

Как правило, его проще (и быстрее) использовать разные топи c для каждого теста; избегая создания брокера для каждого теста.

EDIT

ports = 9092

Вместо этого используйте случайный порт (пропустите это конфигурации) и использовать

configsConsumer.put("bootstrap.servers", this.embeddedKafkaBroker.getBrokersAsString());
0 голосов
/ 23 января 2020

Попробуйте пометить свой EmbeddedKafkaBroker как компонент или с помощью @Autowire создать брокера из аннотации верхнего уровня.

Поскольку посредник не помечен как компонент, его жизненный цикл не будет управляться контекстом приложения, и он не будет очищен между запусками от @DirtiesContext. Возможно, он удерживает что-то, что удерживает топи c за пределами границ теста.

...