У меня были проблемы с тестированием моего потребителя и производителя Kafka. Интеграционные тесты периодически терпят неудачу с TopicExistsException
.
Вот так выглядит мой текущий тестовый класс - UserEventListenerTest
для одного из потребителей:
@SpringBootTest(properties = ["application.kafka.user-event-topic=user-event-topic-UserEventListenerTest",
"application.kafka.bootstrap=localhost:2345"])
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class UserEventListenerTest {
private val logger: Logger = LoggerFactory.getLogger(javaClass)
@Value("\${application.kafka.user-event-topic}")
private lateinit var userEventTopic: String
@Autowired
private lateinit var kafkaConfigProperties: KafkaConfigProperties
private lateinit var embeddedKafka: EmbeddedKafkaRule
private lateinit var sender: KafkaSender<String, UserEvent>
private lateinit var receiver: KafkaReceiver<String, UserEvent>
@BeforeAll
fun setup() {
embeddedKafka = EmbeddedKafkaRule(1, false, userEventTopic)
embeddedKafka.kafkaPorts(kafkaConfigProperties.bootstrap.substringAfterLast(":").toInt())
embeddedKafka.before()
val producerProps: HashMap<String, Any> = hashMapOf(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to kafkaConfigProperties.bootstrap,
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to "org.apache.kafka.common.serialization.StringSerializer",
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to "com.project.userservice.config.MockAvroSerializer"
)
val senderOptions = SenderOptions.create<String, UserEvent>(producerProps)
sender = KafkaSender.create(senderOptions)
val consumerProps: HashMap<String, Any> = hashMapOf(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to kafkaConfigProperties.bootstrap,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to "org.apache.kafka.common.serialization.StringDeserializer",
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to kafkaConfigProperties.deserializer,
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "earliest",
"schema.registry.url" to kafkaConfigProperties.schemaRegistry,
ConsumerConfig.GROUP_ID_CONFIG to "test-consumer"
)
val receiverOptions = ReceiverOptions.create<String, UserEvent>(consumerProps)
.subscription(Collections.singleton("some-topic-after-UserEvent"))
receiver = KafkaReceiver.create(receiverOptions)
}
}
// Some tests
// Not shown as they are irrelevant
...
...
...
Класс UserEventListener
потребляет сообщение от user-event-topic-UserEventListenerTest
и публикует сообщение для some-topic-after-UserEvent
.
Как вы можете видеть из установки, у меня есть тестовый производитель, который публикует sh сообщение на user-event-topic-UserEventListenerTest
, чтобы я мог проверьте, использует ли UserEventListener
сообщение и тестового потребителя, который будет использовать сообщение из some-topic-after-UserEvent
, чтобы я мог видеть, публикует ли UserEventListener
сообщение для some-topic-after-UserEvent
после обработки записи.
KafkaConfigProperties
класс выглядит следующим образом.
@Component
@ConfigurationProperties(prefix = "application.kafka")
data class KafkaConfigProperties(
var bootstrap: String = "",
var schemaRegistry: String = "",
var deserializer: String = "",
var userEventTopic: String = "",
)
И application.yml
выглядит следующим образом.
application:
kafka:
user-event-topic: "platform.user-events.v1"
bootstrap: "localhost:9092"
schema-registry: "http://localhost:8081"
deserializer: com.project.userservice.config.MockAvroDeserializer
Журналы ошибок
com.project.userservice.user.UserEventListenerTest > initializationError FAILED
kafka.common.KafkaException:
at org.springframework.kafka.test.EmbeddedKafkaBroker.createTopics(EmbeddedKafkaBroker.java:354)
at org.springframework.kafka.test.EmbeddedKafkaBroker.lambda$createKafkaTopics$4(EmbeddedKafkaBroker.java:341)
at org.springframework.kafka.test.EmbeddedKafkaBroker.doWithAdmin(EmbeddedKafkaBroker.java:368)
at org.springframework.kafka.test.EmbeddedKafkaBroker.createKafkaTopics(EmbeddedKafkaBroker.java:340)
at org.springframework.kafka.test.EmbeddedKafkaBroker.afterPropertiesSet(EmbeddedKafkaBroker.java:284)
at org.springframework.kafka.test.rule.EmbeddedKafkaRule.before(EmbeddedKafkaRule.java:114)
at com.project.userservice.user.UserEventListenerTest.setup(UserEventListenerTest.kt:62)
Caused by:
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TopicExistsException: Topic 'user-event-topic-UserEventListenerTest' already exists.
at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:104)
at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:272)
at org.springframework.kafka.test.EmbeddedKafkaBroker.createTopics(EmbeddedKafkaBroker.java:351)
... 6 more
Caused by:
org.apache.kafka.common.errors.TopicExistsException: Topic 'user-event-topic-UserEventListenerTest' already exists.
Что я пробовал :
- Используйте разные bootstrap адреса сервера в каждом тесте, указав конфигурацию bootstrap, например,
@SpringBootTest(properties = ["application.kafka.bootstrap=localhost:2345"])
- Используйте разные имена topi c в каждом тесте путем перезаписи конфигурация topi c через
@SpringBootTest
точно так же, как сервер bootstrap перезаписывает e предыдущий пункт - Добавить
@DirtiesContext
к каждому тестовому классу
Версии пакета
- Kotlin 1.3.61
- Spring Boot - 2.2.3.RELEASE
- io.projectreactor.kafka: реактор-кафка: 1.2.2.RELEASE
- org.springframework.kafka: пружина-кафка-тест: 2.3.4 .RELEASE (только реализация теста)
Проблема
У меня есть несколько классов тестирования, которые используют EmbeddedKafkaRule
и настроены более или менее одинаково. Для каждого из них я указываю разные адреса сервера kafka bootstrap и имена topi c, но периодически вижу исключения TopicAlreadyExists.
Что можно сделать, чтобы сделать тестовые классы согласованными?