Как сделать процессор блокирования элементов в весеннем пакете (не только асинхронным с TaskExecuter)? - PullRequest
0 голосов
/ 20 апреля 2020

Весенняя партия имеет средство под названием AsyncItemProcessor. Он просто оборачивает ItemProcessor и запускает его с TaskExecutor, поэтому он может работать асинхронно. Я хочу иметь вызов rest в этом ItemProcessor, проблема в том, что каждый поток внутри этого TaskExecutor, который делает вызов rest, будет заблокирован до получения ответа. Я хочу сделать его неблокирующим, что-то вроде реактивной парадигмы.

У меня есть ItemProcessor, который вызывает точку отдыха и получает ответ:

    @Bean
    public ItemProcessor<String, String> testItemProcessor() {
        return item -> {
            String url = "http://localhost:8787/test";
            try {
                // it's a long time process and take a lot of time
                String response = restTemplate.exchange(new URI(url), HttpMethod.GET, new RequestEntity(HttpMethod.GET, new URI(url)), String.class).getBody();
                return response;
            } catch (URISyntaxException e) {
                e.printStackTrace();
                return null;
            }
        };
    }

Теперь я обертываю это с помощью AsyncItemProcessor:

    @Bean
    public AsyncItemProcessor testAsyncItemProcessor() throws Exception {
        AsyncItemProcessor asyncItemProcessor = new AsyncItemProcessor<>();
        asyncItemProcessor.setDelegate(testItemProcessor());
        asyncItemProcessor.setTaskExecutor(testThreadPoolTaskExecutor());
        asyncItemProcessor.afterPropertiesSet();
        return asyncItemProcessor;
    }

    @Bean
    public ThreadPoolTaskExecutor testThreadPoolTaskExecutor() {
        ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
        threadPoolTaskExecutor.setCorePoolSize(50);
        threadPoolTaskExecutor.setMaxPoolSize(100);
        threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true);
        return threadPoolTaskExecutor;
    }

Я использовал ThreadPoolTaskExecutor в качестве TaskExecuter.

Это ItemWriter:

    @Bean
    public ItemWriter<String> testItemWriter() {
        return items -> {
            // I write them to a file and a database, but for simplicity:
            for (String item : items) {
                System.out.println(item);
            }
        };
    }

    @Bean
    public AsyncItemWriter asyncTestItemWriter() throws Exception {
        AsyncItemWriter asyncItemWriter = new AsyncItemWriter<>();
        asyncItemWriter.setDelegate(testItemWriter());
        asyncItemWriter.afterPropertiesSet();
        return asyncItemWriter;
    }

Настройка шага и задания:

    @Bean
    public Step testStep() throws Exception {
        return stepBuilderFactory.get("testStep")
                .<String, String>chunk(1000)
                .reader(testItemReader())
                .processor(testAsyncItemProcessor())
                .writer(asyncTestItemWriter())
                .build();
    }


    @Bean
    public Job testJob() throws Exception {
        return jobBuilderFactory.get("testJob")
                .start(testStep())
                .build();
    }

ItemReader является простым ListItemReader:

    @Bean
    public ItemReader<String> testItemReader() {
        List<String> integerList = new ArrayList<>();
        for (int i=0; i<10000; i++) {
            integerList.add(String.valueOf(i));
        }
        return new ListItemReader(integerList);
    }

Теперь у меня есть ThreadPoolTaskExecutor с 50 ~ 100 нитями. Каждый поток внутри ItemProcessor выполняет вызов rest и ожидает / блокирует получение ответа от сервера. Есть ли способ сделать эти вызовы / процесс неблокирующим? Если ответ «да», как я должен проектировать ItemWriter? Внутри ItemWriter я хочу записать результаты из ItemProcessor в файл и базу данных. Каждый блок имеет размер 1000, я могу подождать, пока все записи внутри него не будут обработаны, но я не хочу блокировать поток на каждый вызов rest внутри блока. Есть ли способ выполнить sh это?

Я знаю, что шаблон отдыха Spring - это тот, который делает блокировку процесса и веб-клиент должен использоваться, но есть ли какой-нибудь эквивалентный компонент в весеннем пакете (вместо AsyncItemProcessor / AsyncItemWriter) для веб-клиента?

1 Ответ

0 голосов
/ 20 апреля 2020

Нет, пока нет поддержки реактивного программирования в Spring Batch, здесь есть запрос на открытую функцию: https://github.com/spring-projects/spring-batch/issues/1008.

Обратите внимание, что реактивное действие означает все стек должен быть реактивным, от пакетных артефактов (читатель, процессор, записывающее устройство, слушатели и т. д. c) до инфраструктурных компонентов (репозиторий заданий, менеджер транзакций и т. д. c), а не только ваш процессор элементов и записывающее устройство.

Более того, текущая модель обработки фрагментов фактически несовместима с реактивной парадигмой. Причина в том, что ChunkOrientedTasklet использует в основном двух соавторов:

  • A ChunkProvider, который предоставляет куски элементов (делегирование чтения элементов в ItemReader)
  • A ChunkProcessor, который обрабатывает куски (делегируя обработку и запись соответственно ItemProcessor / ItemWriter)

Вот упрощенная версия кода:

Chunk inputs = chunkProvider.provide();
chunkProcessor.process(inputs);

Как вы можете видеть, шаг будет ждать, пока chunkProcessor (процессор + модуль записи) обработает весь кусок, прежде чем читать следующий. Таким образом, в вашем случае, даже если вы используете неблокирующие API-интерфейсы в вашем процессоре + модуле записи, ваш шаг будет ждать полной обработки блока перед чтением следующего блока (кроме ожидания блокирования взаимодействия с репозиторием заданий и менеджером транзакций) .

...