Интегрируй кафку Consumer spring batch - PullRequest
1 голос
/ 24 января 2020

У меня есть Kafka Consumer, разработанный в весенней загрузке, и я могу читать сообщения из topi c. Я хочу интегрировать его с пакетом Spring, потому что я хочу создать пакетный файл. Я не уверен, как это сделать.

Ответы [ 2 ]

2 голосов
/ 24 января 2020

Spring Batch добавлена ​​поддержка чтения / записи данных из / в темы Kafka в v4.2 , см. KafkaItemReader и KafkaItemWriter .

Вы также можете взглянуть на Spring Tips взнос о поддержке Kafka в Spring Batch от Jo sh Long.

0 голосов
/ 24 января 2020

Попробуйте как показано ниже:

private static final Logger LOG = LoggerFactory.getLogger(Listener.class);
@KafkaListener(id = "batch-listener", topics = "${app.topic.batch}")
public void receive(@Payload List<String> messages,
                    @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
                    @Header(KafkaHeaders.OFFSET) List<Long> offsets) {

    LOG.info("- - - - - - - - - - - - - - - - - - - - - - - - - - - - - -");
    LOG.info("beginning to consume batch messages");

    for (int i = 0; i < messages.size(); i++) {

        LOG.info("received message='{}' with partition-offset='{}'",
                messages.get(i), partitions.get(i) + "-" + offsets.get(i));

    }
    LOG.info("all batch messages consumed");
}



 @EnableKafka
 @Configuration
 public class ListenerConfig {

@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;

@Bean
public Map<String, Object> consumerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "batch");
    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "5");
    return props;
}

@Bean
public ConsumerFactory<String, String> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setBatchListener(true);
    factory.getContainerProperties().setBatchErrorHandler(new BatchLoggingErrorHandler());
    return factory;
}

}

ref: https://memorynotfound.com/spring-kafka-batch-listener-example/

...