Моя цель - использовать тестовые контейнеры kafka с контекстом весенней загрузки в тестах без @DirtiesContext
. Проблема в том, что без запуска контейнера отдельно для каждого тестового класса я не представляю, как использовать сообщения, которые были созданы только определенным тестовым классом или методом. Поэтому я в конечном итоге потребляю сообщения, которые не были частью даже тестового класса, который работает.
Одним из решений может быть очистка topi c сообщений. Я понятия не имею, как это сделать, я попытался перезапустить контейнер, но затем следующий тест не смог подключиться к kafka.
Второе решение, которое я имел в виду, - это создать пользователя, который будет создан в начало метода тестирования и как-то записывать сообщения от последних, в то время как другие сотрудники в тестировании будут вызваны. Я был в состоянии сделать это с Embeded Kafka, я не знаю, как это сделать с помощью тестовых контейнеров.
Текущая конфигурация выглядит следующим образом:
@TestConfiguration
public class KafkaContainerConfig {
@Bean(initMethod = "start", destroyMethod = "stop")
public KafkaContainer kafkaContainer() {
return new KafkaContainer("5.0.3");
}
@Bean
public KafkaAdmin kafkaAdmin(KafkaProperties kafkaProperties, KafkaContainer kafkaContainer) {
kafkaProperties.setBootstrapServers(List.of(kafkaContainer.getBootstrapServers()));
return new KafkaAdmin(kafkaProperties.buildAdminProperties());
}
}
С аннотацией, которая обеспечит приведенная выше конфигурация
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Import(KafkaContainerConfig.class)
@EnableAutoConfiguration(exclude = TestSupportBinderAutoConfiguration.class)
@TestPropertySource("classpath:/application-test.properties")
@DirtiesContext
public @interface IncludeKafkaTestContainer {
}
И в самом тестовом классе с несколькими такими конфигурациями это будет выглядеть так:
@IncludeKafkaTestContainer
@IncludePostgresTestContainer
@SpringBootTest(webEnvironment = RANDOM_PORT)
class SomeTest {
...
}
В настоящее время потребитель в методе теста создается следующим образом:
KafkaConsumer<String, String> kafkaConsumer = createKafkaConsumer("topic_name");
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
List<ConsumerRecord<String, String>> topicMsgs = Lists.newArrayList(consumerRecords.iterator());
А:
public static KafkaConsumer<String, String> createKafkaConsumer(String topicName) {
Properties properties = new Properties();
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers());
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "testGroup_" + topicName);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class)
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
kafkaConsumer.subscribe(List.of(topicName));
return kafkaConsumer;
}