Я создал группу тестов (JUnit 5) со встроенным kafka (spring-kafka-test), и когда я запускаю их иногда (не всегда), я получаю " Topi c 'some_name' уже существует"на один или несколько тестов за один прогон.
Все тесты используют одно и то же имя topi c (я не хочу менять это имя для каждого теста), класс теста имеет аннотацию DirtiesContext (AFTER_EACH_TEST_METHOD). Я не уверен, что является причиной этой проблемы и как ее решить.
@SpringBootTest
@EmbeddedKafka(partitions = 1, topics = {"some_name"}, ports = 9092)
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
@ActiveProfiles("test")
public class RemovalKafkaTestIT {
private EmbeddedKafkaBroker embeddedKafkaBroker = new EmbeddedKafkaBroker(1, true, TOPIC);
private final static String SERVER_ADDRES = "127.0.0.1:9092";
private Consumer<String, String> prepareConsumer() {
Map<String, Object> configsConsumer = new HashMap<>(KafkaTestUtils.consumerProps("consumer", "false", embeddedKafkaBroker));
configsConsumer.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
configsConsumer.put("bootstrap.servers", SERVER_ADDRES);
Consumer<String, String> consumer = new DefaultKafkaConsumerFactory<>(configsConsumer, new StringDeserializer(), new StringDeserializer()).createConsumer();
consumer.subscribe(singleton("some_name"));
return consumer;
}
@Test
public void someMethodWithKafka1() {
// some logic
...
// check topic content
Consumer<String, String> consumer = this.prepareConsumer();
embeddedKafkaBroker.consumeFromEmbeddedTopics(consumer, "some_name");
ConsumerRecords<String, String> records = KafkaTestUtils.getRecords(consumer);
assertThat(records.count()).isEqualTo(1); // and other checks :)
// clean
consumer.commitSync();
consumer.close();
}
@Test
public void someMethodWithKafka2() {
// some other logic
...
// check topic content
Consumer<String, String> consumer = this.prepareConsumer();
embeddedKafkaBroker.consumeFromEmbeddedTopics(consumer, "some_name");
ConsumerRecords<String, String> records = KafkaTestUtils.getRecords(consumer);
assertThat(records.count()).isEqualTo(1); // and other checks :)
// clean
consumer.commitSync();
consumer.close();
}
}