Как настроить тест Spring Kafka с помощью EmbeddedKafkaRule / EmbeddedKafka, чтобы исправить прерывистую ошибку TopicExistsException? - PullRequest
0 голосов
/ 26 января 2020

У меня были проблемы с тестированием моего потребителя и производителя Kafka. Интеграционные тесты периодически терпят неудачу с TopicExistsException.

Вот так выглядит мой текущий тестовый класс - UserEventListenerTest для одного из потребителей:

@SpringBootTest(properties = ["application.kafka.user-event-topic=user-event-topic-UserEventListenerTest",
    "application.kafka.bootstrap=localhost:2345"])
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class UserEventListenerTest {
    private val logger: Logger = LoggerFactory.getLogger(javaClass)

    @Value("\${application.kafka.user-event-topic}")
    private lateinit var userEventTopic: String

    @Autowired
    private lateinit var kafkaConfigProperties: KafkaConfigProperties

    private lateinit var embeddedKafka: EmbeddedKafkaRule
    private lateinit var sender: KafkaSender<String, UserEvent>
    private lateinit var receiver: KafkaReceiver<String, UserEvent>

    @BeforeAll
    fun setup() {
        embeddedKafka = EmbeddedKafkaRule(1, false, userEventTopic)
        embeddedKafka.kafkaPorts(kafkaConfigProperties.bootstrap.substringAfterLast(":").toInt())
        embeddedKafka.before()

        val producerProps: HashMap<String, Any> = hashMapOf(
            ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to kafkaConfigProperties.bootstrap,
            ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to "org.apache.kafka.common.serialization.StringSerializer",
            ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to "com.project.userservice.config.MockAvroSerializer"
        )
        val senderOptions = SenderOptions.create<String, UserEvent>(producerProps)
        sender = KafkaSender.create(senderOptions)

        val consumerProps: HashMap<String, Any> = hashMapOf(
            ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to kafkaConfigProperties.bootstrap,
            ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to "org.apache.kafka.common.serialization.StringDeserializer",
            ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to kafkaConfigProperties.deserializer,
            ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "earliest",
            "schema.registry.url" to kafkaConfigProperties.schemaRegistry,
            ConsumerConfig.GROUP_ID_CONFIG to "test-consumer"
        )
        val receiverOptions = ReceiverOptions.create<String, UserEvent>(consumerProps)
            .subscription(Collections.singleton("some-topic-after-UserEvent"))
        receiver = KafkaReceiver.create(receiverOptions)
    }
}

// Some tests
// Not shown as they are irrelevant
...
...
...

Класс UserEventListener потребляет сообщение от user-event-topic-UserEventListenerTest и публикует сообщение для some-topic-after-UserEvent.

Как вы можете видеть из установки, у меня есть тестовый производитель, который публикует sh сообщение на user-event-topic-UserEventListenerTest, чтобы я мог проверьте, использует ли UserEventListener сообщение и тестового потребителя, который будет использовать сообщение из some-topic-after-UserEvent, чтобы я мог видеть, публикует ли UserEventListener сообщение для some-topic-after-UserEvent после обработки записи.

KafkaConfigProperties класс выглядит следующим образом.

@Component
@ConfigurationProperties(prefix = "application.kafka")
data class KafkaConfigProperties(
    var bootstrap: String = "",
    var schemaRegistry: String = "",
    var deserializer: String = "",
    var userEventTopic: String = "",
)

И application.yml выглядит следующим образом.

application:
  kafka:
    user-event-topic: "platform.user-events.v1"
    bootstrap: "localhost:9092"
    schema-registry: "http://localhost:8081"
    deserializer: com.project.userservice.config.MockAvroDeserializer

Журналы ошибок

com.project.userservice.user.UserEventListenerTest > initializationError FAILED
    kafka.common.KafkaException:
        at org.springframework.kafka.test.EmbeddedKafkaBroker.createTopics(EmbeddedKafkaBroker.java:354)
        at org.springframework.kafka.test.EmbeddedKafkaBroker.lambda$createKafkaTopics$4(EmbeddedKafkaBroker.java:341)
        at org.springframework.kafka.test.EmbeddedKafkaBroker.doWithAdmin(EmbeddedKafkaBroker.java:368)
        at org.springframework.kafka.test.EmbeddedKafkaBroker.createKafkaTopics(EmbeddedKafkaBroker.java:340)
        at org.springframework.kafka.test.EmbeddedKafkaBroker.afterPropertiesSet(EmbeddedKafkaBroker.java:284)
        at org.springframework.kafka.test.rule.EmbeddedKafkaRule.before(EmbeddedKafkaRule.java:114)
        at com.project.userservice.user.UserEventListenerTest.setup(UserEventListenerTest.kt:62)

        Caused by:
        java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TopicExistsException: Topic 'user-event-topic-UserEventListenerTest' already exists.
            at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
            at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
            at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:104)
            at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:272)
            at org.springframework.kafka.test.EmbeddedKafkaBroker.createTopics(EmbeddedKafkaBroker.java:351)
            ... 6 more

            Caused by:
            org.apache.kafka.common.errors.TopicExistsException: Topic 'user-event-topic-UserEventListenerTest' already exists.

Что я пробовал :

  • Используйте разные bootstrap адреса сервера в каждом тесте, указав конфигурацию bootstrap, например, @SpringBootTest(properties = ["application.kafka.bootstrap=localhost:2345"])
  • Используйте разные имена topi c в каждом тесте путем перезаписи конфигурация topi c через @SpringBootTest точно так же, как сервер bootstrap перезаписывает e предыдущий пункт
  • Добавить @DirtiesContext к каждому тестовому классу

Версии пакета

  • Kotlin 1.3.61
  • Spring Boot - 2.2.3.RELEASE
  • io.projectreactor.kafka: реактор-кафка: 1.2.2.RELEASE
  • org.springframework.kafka: пружина-кафка-тест: 2.3.4 .RELEASE (только реализация теста)

Проблема

У меня есть несколько классов тестирования, которые используют EmbeddedKafkaRule и настроены более или менее одинаково. Для каждого из них я указываю разные адреса сервера kafka bootstrap и имена topi c, но периодически вижу исключения TopicAlreadyExists.

Что можно сделать, чтобы сделать тестовые классы согласованными?

1 Ответ

1 голос
/ 26 января 2020

Я указываю разные кафки bootstrap адрес сервера и топи c имена, но я все еще периодически вижу исключения TopicAlreadyExists

Это не имеет смысла; если у них каждый раз новый порт и особенно новые имена topi c, то topi c (s) уже не существует.

Некоторые предложения:

  1. Поскольку вы используете JUnit5, не используйте JUnit4 EmbeddedKafkaRule, вместо этого используйте EmbeddedKafkaBroker; или просто добавьте @EmbeddedKafka, и посредник будет добавлен в качестве компонента к контексту приложения Spring и его жизненному циклу, управляемому Spring (используйте @DirtiesContext для уничтожения); для тестов, не относящихся к Spring, посредник будет создан (и уничтожен) JUnit5 EmbeddedKafkaCondition и доступен через EmbeddedKafkaCondition.getBroker().
  2. Не использовать явные порты; позвольте брокеру использовать свой случайный порт по умолчанию и использовать embeddedKafka.getBrokersAsString() для свойства bootstrap servers.
  3. Если вам нужно управлять брокерами самостоятельно (в @BeforeAll), destroy() их в @AfterAll.
...