Spring Kafka Test - Не получает данные в @KafkaListener с EmbeddedKafka - PullRequest
4 голосов
/ 11 марта 2020

Мы проводим интеграционный тест для нашего приложения, использующего Cucumber, и у нас возникают некоторые проблемы при тестировании @KafkaListener. Нам удалось использовать EmbeddedKafka и вывести на него данные.

Но потребитель никогда не получает никаких данных, и мы не знаем, что происходит.

Это наш код:

Конфигурация производителя

@Configuration
@Profile("test")
public class KafkaTestProducerConfig {

    private static final String SCHEMA_REGISTRY_URL = "schema.registry.url";

    @Autowired
    protected EmbeddedKafkaBroker embeddedKafka;

    @Bean
    public Map<String, Object> producerConfig() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                embeddedKafka.getBrokersAsString());
        props.put(SCHEMA_REGISTRY_URL, "URL");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
        return props;
    }

    @Bean
    public ProducerFactory<String, GenericRecord> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfig());
    }

    @Bean
    public KafkaTemplate<String, GenericRecord> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

}

Конфигурация потребителя

@Configuration
@Profile("test")
@EnableKafka
public class KafkaTestConsumerConfig {

    @Autowired
    protected EmbeddedKafkaBroker embeddedKafka;

    private static final String SCHEMA_REGISTRY_URL = "schema.registry.url";

    @Bean
    public Map<String, Object> consumerProperties() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "groupId");
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
        props.put(SCHEMA_REGISTRY_URL, "URL");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1000");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 10000);
        return props;
    }

    @Bean
    public DefaultKafkaConsumerFactory<String, Object> consumerFactory() {
        KafkaAvroDeserializer avroDeserializer = new KafkaAvroDeserializer();
        avroDeserializer.configure(consumerProperties(), false);
        return new DefaultKafkaConsumerFactory<>(consumerProperties(), new StringDeserializer(), avroDeserializer);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setBatchListener(true);
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        return factory;
    }

}

Интеграционный тест

@SpringBootTest(
        webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT,
        classes = Application.class)
@ActiveProfiles("test")
@EmbeddedKafka(topics = {"TOPIC1", "TOPIC2", "TOPIC3"})
public class CommonStepDefinitions implements En {

    protected static final Logger LOGGER = LoggerFactory.getLogger(CommonStepDefinitions.class);

    @Autowired
    protected KafkaTemplate<String, GenericRecord> kafkaTemplate;

}

Определения шагов

public class KafkaStepDefinitions extends CommonStepDefinitions {

    private static final String TEMPLATE_TOPIC = "TOPIC1";

    public KafkaStepDefinitions(){
        Given("given statement", () -> {
            OperationEntity operationEntity = new OperationEntity();
            operationEntity.setFoo("foo");
            kafkaTemplate.send(TEMPLATE_TOPIC, AvroPojoTransformer.pojoToRecord(operationEntity));
        });
    }

}

Потребитель Этот же код работает нормально для рабочий сервер Bootstrap, но он никогда не был достигнут с помощью Embedded Kafka

@KafkaListener(topics = "${kafka.topic1}", groupId = "groupId")
    public void consume(List<GenericRecord> records, Acknowledgment ack) throws DDCException {
        LOGGER.info("Batch of {} records received", records.size());
        //do something with the data
        ack.acknowledge();
    }

Все в журналах выглядит нормально, но мы не знаем, чего не хватает.

Заранее спасибо .

Ответы [ 2 ]

0 голосов
/ 13 марта 2020

Ваш тест заканчивается до его начала; см. имя потока, содержащее 0-0- C -1; потребитель остановился менее чем через секунду после его запуска.

Я только что проверил, и нет, мой тест выполняет , потому что вы можете увидеть журнал значений ProducerConfig в строке 1174 из журнала. И этот журнал появляется сразу после kafkaTemplate.send (topi c, entity). Я не использую @Test, потому что в огурце у вас есть stepDefinitions. Вы можете увидеть код в моем сообщении.

OK; но вам нужна какая-то защелка в тесте, чтобы дождаться, пока потребитель фактически назначит темы / разделы и получит данные. Теперь, когда вы структурировали тест, тест завершается до того, как потребитель полностью запустится. Смотрите мой ответ на этот вопрос , чтобы узнать, как можно обернуть слушателя, чтобы вы могли дождаться получения записи. (Для этого используются обычные тесты JUnit.)

Другой метод - это как-то внедрить сервис в ваш bean-компонент-слушатель, который отсчитывает блокировку.

В качестве быстрого теста добавьте Thread.sleep(10_000) к вашему " step ".

Но, предположительно, вы захотите как-то утверждать, что потребитель действительно получил данные. Вы должны сделать это утверждение до завершения теста, и поскольку он асинхронный c, вам нужен какой-то механизм, чтобы дождаться его (или тайм-аут).

0 голосов
/ 12 марта 2020

Проблема в том, что потребитель не подключен к встроенной Kafka. Вы можете сделать это, запустив свои тесты с профилем test и добавив следующее к application-test.yml.

spring:
  kafka:
    bootstrap-servers: ${spring.embedded.kafka.brokers}

Тогда вам также не понадобятся пользовательские consumerProperties, consumerFactory и kafkaListenerContainerFactory бобов. Весенняя загрузка автоматически подключит их для вас. Если вы используете wi sh для использования этих bean-компонентов (не знаю почему), вам следует дважды проверить KafkaAutoConfiguration, чтобы убедиться, что вы переопределяете правильные имена и типов.

...