Как зафиксировать смещения при использовании KafkaItemReader в весеннем пакетном задании, когда все сообщения обработаны и записаны в файл .dat? - PullRequest
0 голосов
/ 13 апреля 2020

Я разработал Spring Batch Job, который читает из Kafka topi c с использованием класса KafkaItemReader. Я хочу зафиксировать смещение, только когда сообщения, прочитанные в определенном фрагменте, обработаны и успешно записаны в файл .dat.

@Bean
public Job kafkaEventReformatjob(
        @Qualifier("MaintStep") Step MainStep,
        @Qualifier("moveFileToFolder") Step moveFileToFolder,
        @Qualifier("compressFile") Step compressFile,
        JobExecutionListener listener)
{
    return jobBuilderFactory.get("kafkaEventReformatJob")
            .listener(listener)
            .incrementer(new RunIdIncrementer())
            .flow(MainStep)
            .next(moveFileToFolder)
            .next(compressFile)
            .end()
            .build();
}

@Bean
Step MainStep(
        ItemProcessor<IncomingRecord, List<Record>> flatFileItemProcessor,
        ItemWriter<List<Record>> flatFileWriter)
{
    return stepBuilderFactory.get("mainStep")
            .<InputRecord, List<Record>> chunk(5000)
            .reader(kafkaItemReader())
            .processor(flatFileItemProcessor)
            .writer(writer())
            .listener(basicStepListener)
            .build();
}
//Reader reads all the messages from akfka topic and sending back in form of IncomingRecord.
 @Bean
KafkaItemReader<String, IncomingRecord> kafkaItemReader() {
    Properties props = new Properties();
    props.putAll(this.properties.buildConsumerProperties());
    List<Integer> partitions = new ArrayList<>();
    partitions.add(0);
    partitions.add(1);
    return new KafkaItemReaderBuilder<String, IncomingRecord>()
            .partitions(partitions)
            .consumerProperties(props)
            .name("records")
            .saveState(true)
            .topic(topic)
            .pollTimeout(Duration.ofSeconds(40L))
            .build();
}

  @Bean
public ItemWriter<List<Record>> writer() {
    ListUnpackingItemWriter<Record> listUnpackingItemWriter = new ListUnpackingItemWriter<>();
    listUnpackingItemWriter.setDelegate(flatWriter());
    return listUnpackingItemWriter;
}

public ItemWriter<Record> flatWriter() {
    FlatFileItemWriter<Record> fileWriter = new FlatFileItemWriter<>();
    String tempFileName = "abc";
    LOGGER.info("Output File name " + tempFileName + " is in working directory ");
    String workingDir = service.getWorkingDir().toAbsolutePath().toString();
    Path outputFile = Paths.get(workingDir, tempFileName);
    fileWriter.setName("fileWriter");
    fileWriter.setResource(new FileSystemResource(outputFile.toString()));
    fileWriter.setLineAggregator(lineAggregator());
    fileWriter.setForceSync(true);
    fileWriter.setFooterCallback(customFooterCallback());
    fileWriter.close();
    LOGGER.info("Successfully created the file writer");
    return fileWriter;

}

@StepScope
@Bean
public TransformProcessor processor() {
    return new TransformProcessor();
}

=============== ================================================== =============

Writer Class

 @BeforeStep
public void beforeStep(StepExecution stepExecution) {
    this.stepExecution = stepExecution;
}

@AfterStep
public void afterStep(StepExecution stepExecution) {
    this.stepExecution.setWriteCount(count);
}

@Override
public void write(final List<? extends List<Record>> lists) throws Exception {

    List<Record> consolidatedList = new ArrayList<>();
    for (List<Record> list : lists) {
        if (!list.isEmpty() && null != list)
            consolidatedList.addAll(list);
    }

    delegate.write(consolidatedList);
    count += consolidatedList.size(); // to count Trailer record count
}

=================== =================================================

Элемент Процессор

@ Переопределить публикацию c Процесс списка (запись IncomingRecord) {

    List<Record> recordList = new ArrayList<>();

    if (null != record.getEventName() and a few other conditions inside this section) {
        // setting values of Record Class by extracting from the IncomingRecord.
        recordList.add(the valid records which matching the condition);
        }else{
        return null;
        }

1 Ответ

0 голосов
/ 16 апреля 2020

Синхронизация операции чтения и операции записи между двумя транзакционными ресурсами (например, очередью и базой данных) возможна с помощью диспетчера транзакций JTA, который координирует оба диспетчера транзакций (протокол 2P C).

Однако такой подход невозможен, если один из ресурсов не является транзакционным (как и большинство файловых систем). Поэтому, если вы не используете транзакционную файловую систему и диспетчер транзакций JTA, который координирует диспетчер транзакций kafka и диспетчер транзакций файловой системы ... вам нужен другой подход, такой как шаблон Компенсирующая транзакция . В вашем случае операция отмены (компенсационное действие) будет перематывать смещение туда, где оно было до сбойного блока.

...