Как распараллелить одну кварцевую работу с пулами потоков? - PullRequest
0 голосов
/ 06 ноября 2019

В моем загрузочном приложении Spring есть запланированное кварцевое задание для отправки элементов из большого списка на некоторые веб-сервисы методом, запускаемым каждые 5 минут.

Мне было интересно, что мой процесс отправки (один метод отправки) может быть парализован? Что я хочу, например, когда список 10000 элементов пришел из БД и потоки из пула потоков будут работать одновременно сотправить все записи в этом списке, и работа будет завершена после отправки всех записей.

Что я пробовал ниже кода, я установил ThreadPoolTaskExecutor с размером пула 5 потоков. И все же, когда я выполняю и проверяю журналы заданий, в нем говорится, что задания завершаются за секунды, но для отправки всех данных требуется несколько минут. Он продолжал работать правильно, но Иов, казалось, закончил в считанные секунды. Это, вероятно, говорит, что работа завершена после того, как все потоки установлены. Это то, чего я избегаю, потому что мне нужно знать время выполнения задания и журналы.

@Autowired
MyService myService;

@NonTransactionalService 
public class MySenderService{

    ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
    taskExecutor.setCorePoolSize(5);
    taskExecutor.setMaxPoolSize(5);
    taskExecutor.initialize();

    public void sendAll(){
        List<Long> largeList = someMethod();
        largeList.stream().forEach(i -> {
            taskExecutor.execute(new Runnable() {
                @Override
                public void run() {
                    myService.send(i);
                }
            });
        }
    }
}

Итак, как я могу запустить этот метод отправки с работниками mutliple внутри одного задания?
Или иначе это хорошая практика для установкинесколько идентичных заданий для отправки одного и того же списка одним и тем же способом?

1 Ответ

0 голосов
/ 06 ноября 2019

Вы можете попробовать использовать Flux, IMHO - это решение, которое требует меньше кода:

        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setCorePoolSize(5);
        taskExecutor.setMaxPoolSize(5);
        taskExecutor.initialize();

        List<Integer> largeList = someMethod();
        System.out.println(largeList);
        Flux.fromStream(largeList.stream())
                .parallel(5)
                .runOn(Schedulers.fromExecutor(taskExecutor))
                .subscribe( x -> { System.out.println(x);});

        taskExecutor.shutdown();

Не забудьте выключить TaskExecutor, но как @M. Дейнм сказал, что было бы лучше, если бы он был создан в другом месте и добавлен к вашим услугам.

...