Записать данные в Кафку в весеннюю партию - PullRequest
0 голосов
/ 25 сентября 2019

Свежий новичок в Кафке, недавно я пытаюсь получить данные из весенней партии, а затем написать в Кафку, но я понятия не имею, как это сделать.Может ли кто-нибудь помочь мне понять, как записать данные в Kafka?Вот демонстрационный код, который я пишу с помощью SpringBatch для получения данных:

@ Публичный класс конфигурации FileReader {

@Autowired
private JobBuilderFactory jobBuilderFactory;

@Autowired
private StepBuilderFactory stepBuilderFactory;

@Autowired
@Qualifier("flatFileWriter")
private ItemWriter<? super Demo1> flatFileWriter;
@Bean
public Job FileReaderJob() {
    return jobBuilderFactory.get("FileReaderJob").start(FileReaderStep()).build();
}
private Step FileReaderStep() {
    // TODO Auto-generated method stub
    return stepBuilderFactory.get("FileReaderStep").<Demo1,Demo1>chunk(100).reader(flatFileReader())
            .writer(flatFileWriter).build();
}

@Bean
@StepScope
public FlatFileItemReader<Demo1> flatFileReader() {
    // TODO Auto-generated method stub

    FlatFileItemReader<Demo1> reader = new FlatFileItemReader<Demo1>();
    reader.setResource(new  ClassPathResource("Demo1.csv"));
    reader.setLinesToSkip(1);
    DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
    tokenizer.setNames(new String [] {"id","first","last"});
    DefaultLineMapper<Demo1> mapper = new DefaultLineMapper<>();
    mapper.setLineTokenizer(tokenizer);
    mapper.setFieldSetMapper(new FieldSetMapper<Demo1>() {

        @Override
        public Demo1 mapFieldSet(FieldSet fieldSet) throws BindException {
            Demo1 demo1 = new Demo1();
            demo1.setId(fieldSet.readLong("id"));
            demo1.setFirst(fieldSet.readString("first"));
            demo1.setLast(fieldSet.readString("last"));

            // TODO Auto-generated method stub
            return demo1;
        }
    });

    mapper.afterPropertiesSet();
    reader.setLineMapper(mapper);

    return reader;
}

}

1 Ответ

0 голосов
/ 25 сентября 2019

Предстоящая версия Spring Batch v4.2 GA обеспечит поддержку чтения / записи данных в разделах Apache Kafka.Вы уже можете попробовать его с 4.2.0.RC1 релизом .

. Для KafkaItemWriter вам потребуется настроить KafkaTemplate.Вот пример писателя:

@Bean
public KafkaItemWriter<String, Demo1> kafkaItemWriter(KafkaTemplate<String, Demo1> kafkaTemplate) {
    return new KafkaItemWriterBuilder<String, Demo1>()
            .kafkaTemplate(kafkaTemplate)
            .build();
}

Вы также можете взглянуть на статью Spring Tips о поддержке Кафки в Spring Batch Джоша Лонга.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...