Пружинная партия KafkaConsumer не безопасна для многопоточного доступа - PullRequest
1 голос
/ 16 апреля 2020

Я новичок в спортивной партии. У меня есть требование, которое нужно прочитать поток kafka и фильтровать данные и сохранить в базе данных. Для этого я использовал весеннюю партию с KafkaItemReader. Когда я запускаю несколько заданий в весеннем задании, это дает java .util.ConcurrentModificationException: KafkaConsumer не является безопасным для многопоточного доступа Ошибка. В это время выполняется только последнее задание.

Это конфигурация пакетной пружины.

    @Autowired
    TaskExecutor taskExecutor;

    @Autowired
    JobRepository jobRepository;

    @Bean
    KafkaItemReader<Long, Event> kafkaItemReader() {
        Properties props = new Properties();
        props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
        props.putAll(this.properties.buildConsumerProperties());
        return new KafkaItemReaderBuilder<Long, Event>()
                .partitions(0)
                .consumerProperties(props)
                .name("event-reader")
                .saveState(true)
                .topic(topicName)
                .build();
    }

    @Bean
    public TaskExecutor taskExecutor(){
        SimpleAsyncTaskExecutor asyncTaskExecutor=new SimpleAsyncTaskExecutor("spring_batch");
        asyncTaskExecutor.setConcurrencyLimit(5);
        return asyncTaskExecutor;
    }

    @Bean(name = "JobLauncher")
    public JobLauncher simpleJobLauncher() throws Exception {
        SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
        jobLauncher.setJobRepository(jobRepository);
        jobLauncher.setTaskExecutor(taskExecutor);
        jobLauncher.afterPropertiesSet();
        return jobLauncher;
    }

И есть конечная точка контроллера, которая запускает новые задания. Это способ, которым я должен использовать начать новое задание

    @Autowired
    @Qualifier("JobLauncher")
    private JobLauncher jobLauncher;


    Map<String, JobParameter> items = new HashMap<>();
    items.put("userId", new JobParameter("UserInputId"));
    JobParameters paramaters = new JobParameters(items);
    try {
        jobLauncher.run(job, paramaters);
    } catch (Exception e) {
        e.printStackTrace();
    }

Я видел, что KafkaItemReader не является потоком безопасный . Я хочу знать, является ли этот способ правильным или есть ли способ считывать потоки kafka в многопоточной пружинной пакетной среде. Спасибо и всего наилучшего

Ответы [ 2 ]

1 голос
/ 16 апреля 2020

В соответствии с весной документация , он использует KafkaConsumer; который сам по себе не является потокобезопасным в соответствии с их подробной документацией .

. Пожалуйста, посмотрите, можете ли вы использовать любой из подходов (например, разделение или один потребитель на поток), как указано в этой документации. В вашем примере вам может понадобиться использовать отдельный обработчик для taskexecutor (если вы используете подход развязки).

1 голос
/ 16 апреля 2020

Документ KafkaItemReader задокументирован как не ориентированный на многопотоковое исполнение. Вот выдержка из его Javado c:

Since KafkaConsumer is not thread-safe, this reader is not thread-safe.

Таким образом, его использование в многопоточном окружении неверно и не соответствует документации. Что вы можете сделать, так это использовать читатель для каждого раздела.

...