Я новичок в спортивной партии. У меня есть требование, которое нужно прочитать поток kafka и фильтровать данные и сохранить в базе данных. Для этого я использовал весеннюю партию с KafkaItemReader. Когда я запускаю несколько заданий в весеннем задании, это дает java .util.ConcurrentModificationException: KafkaConsumer не является безопасным для многопоточного доступа Ошибка. В это время выполняется только последнее задание.
Это конфигурация пакетной пружины.
@Autowired
TaskExecutor taskExecutor;
@Autowired
JobRepository jobRepository;
@Bean
KafkaItemReader<Long, Event> kafkaItemReader() {
Properties props = new Properties();
props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
props.putAll(this.properties.buildConsumerProperties());
return new KafkaItemReaderBuilder<Long, Event>()
.partitions(0)
.consumerProperties(props)
.name("event-reader")
.saveState(true)
.topic(topicName)
.build();
}
@Bean
public TaskExecutor taskExecutor(){
SimpleAsyncTaskExecutor asyncTaskExecutor=new SimpleAsyncTaskExecutor("spring_batch");
asyncTaskExecutor.setConcurrencyLimit(5);
return asyncTaskExecutor;
}
@Bean(name = "JobLauncher")
public JobLauncher simpleJobLauncher() throws Exception {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(jobRepository);
jobLauncher.setTaskExecutor(taskExecutor);
jobLauncher.afterPropertiesSet();
return jobLauncher;
}
И есть конечная точка контроллера, которая запускает новые задания. Это способ, которым я должен использовать начать новое задание
@Autowired
@Qualifier("JobLauncher")
private JobLauncher jobLauncher;
Map<String, JobParameter> items = new HashMap<>();
items.put("userId", new JobParameter("UserInputId"));
JobParameters paramaters = new JobParameters(items);
try {
jobLauncher.run(job, paramaters);
} catch (Exception e) {
e.printStackTrace();
}
Я видел, что KafkaItemReader не является потоком безопасный . Я хочу знать, является ли этот способ правильным или есть ли способ считывать потоки kafka в многопоточной пружинной пакетной среде. Спасибо и всего наилучшего