Spring Boot Embedded Kafka не может подключиться - PullRequest
3 голосов
/ 03 апреля 2019

Я пытаюсь написать интеграционный тест для моего потребителя Kafka.Я выполнил официальную справочную документацию , но когда я начинаю свой тест, я вижу только это повторное объявление до бесконечности:

-2019-04-03 15: 47: 34.002ПРЕДУПРЕЖДЕНИЕ 13120 --- [main] org.apache.kafka.clients.NetworkClient: [Consumer clientId = consumer-1, groupId = my-group] Не удалось установить соединение с узлом -1.Брокер может быть недоступен.

Что я делаю не так?

Я использую JUnit5, Spring Boot и spring-kafka и spring-kafka-test.

У меня есть @EnableKafka аннотация для моего @Configuration класса.

Вот так выглядит мой тестовый класс:

@ExtendWith(SpringExtension::class)
@SpringBootTest(classes = [TestKafkaConfig::class])
@DirtiesContext
@EmbeddedKafka(
        partitions = 1,
        topics = [KafkaIntegrationTest.MY_TOPIC])
class KafkaIntegrationTest {

    @Autowired
    private lateinit var embeddedKafka: EmbeddedKafkaBroker

    @Test
    fun test() {
        val senderProps = KafkaTestUtils.senderProps(embeddedKafka.brokersAsString)
        val template = KafkaTemplate(DefaultKafkaProducerFactory<Int, String>(senderProps))
        template.defaultTopic = KafkaIntegrationTest.MY_TOPIC
        template.sendDefault("foo")
    }
}

my application.yml выглядит так:

kafka:
  consumer:
    group-id: my-group
    bootstrap-servers: ${BOOTSTRAP_SERVERS:localhost:9092}
    value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
    key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    properties:
      schema.registry.url: ${SCHEMA_REGISTRY_URL:http://localhost:8081}
      specific.avro.reader: true

Я также пытался настроить MockSchemaRegistryClient, но получаю точно такое же повторное сообщение.(Вот как я пытался настроить MockSchemaRegistryClient):

@TestConfiguration
@Import(TestConfig::class)
class TestKafkaConfig {

    @Autowired
    private lateinit var props: KafkaProperties

    @Bean
    fun schemaRegistryClient() = MockSchemaRegistryClient()

    @Bean
    fun kafkaAvroSerializer() = KafkaAvroSerializer(schemaRegistryClient())

    @Bean
    fun kafkaAvroDeserializer() = KafkaAvroDeserializer(schemaRegistryClient(), props.buildConsumerProperties())

    @Bean
    fun producerFactory(): ProducerFactory<*, *> = DefaultKafkaProducerFactory(
            props.buildProducerProperties(),
            StringSerializer(),
            kafkaAvroSerializer())

    @Bean
    fun consumerFactory(): ConsumerFactory<*, *> = DefaultKafkaConsumerFactory(
            props.buildConsumerProperties(),
            StringDeserializer(),
            kafkaAvroDeserializer()
    )

    @Bean
    fun kafkaListenerContainerFactory() = ConcurrentKafkaListenerContainerFactory<Any, Any>().apply {
        setConsumerFactory(consumerFactory() as ConsumerFactory<in Any, in Any>?)
    }

}

Что я делаю не так? Обратите внимание, что Я использую реестр Confluent Schema и пытаюсь десериализовать из Avro.

Я пытаюсь проверить, работает ли мой потребитель или нет, и выглядит так:

open class SomeConsumer(private val someUseCase) {

    @KafkaListener(topics = ["\${kafka.some-topic}"])
    open fun processMessage(record: ConsumerRecord<String, SomeObject>) {
        someUseCase.call(record)
    }
}

1 Ответ

4 голосов
/ 08 апреля 2019

Я полагаю, что вы не указали URL-адрес брокера для своих тестов.

В документации есть примечание о том, как получить это значение:

Когда встроенный Kafka и встроенный сервер Zookeeper запускаются EmbeddedKafkaBroker, системное свойство с именем spring.embedded.kafka.brokers устанавливается на адрес брокеров Kafka, а системное свойство с именем spring.embedded.zookeeper.connect имеет значение установить адрес Zookeeper. Для этого свойства предусмотрены удобные константы (EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS и EmbeddedKafkaBroker.SPRING_EMBEDDED_ZOOKEEPER_CONNECT).

(находится в нижней части секции junit здесь )

Один из способов исправить это - установить kafka.consumers.bootstrap-servers на это значение в ваших тестах, например,

spring:
    kafka:
        consumer:
            bootstrap-servers: ${spring.embedded.kafka.brokers}
...