обрабатывать несколько объектов, прочитанных через ItemReader одновременно (одновременно) с помощью пакета Spring - PullRequest
0 голосов
/ 28 октября 2018

Я пытаюсь прочитать данные из базы данных и запустить процесс для каждого объекта одновременно.

Моя конфигурация, как показано ниже,

@Bean
public Job job() {
    return jobBuilderFactory.get("job").incrementer(new RunIdIncrementer()).listener(new Listener(videoDao))
            .flow(step1()).end().build();
}

@Bean
public Step step1() {
    return stepBuilderFactory.get("step1")
            .<VideosDTO, VideosDTO>chunk(3)
            .reader(databaseVideoItemReader(null))
            .processor(new Processor())
            .writer(new Writer(videoDao))
            .build();
}



 @Bean
 @StepScope
ItemReader<VideosDTO> databaseVideoItemReader(@Value("#{jobParameters[userId]}") String userId) {
    logger.info("Fetching videos for userId:"+userId);
    JdbcCursorItemReader<VideosDTO> databaseReader = new JdbcCursorItemReader<>();
    databaseReader.setDataSource(dataSource);
    databaseReader.setSql("SELECT * FROM voc.t_videos where user_id="+userId+"AND job_success_ind='N'");
    databaseReader.setRowMapper(new BeanPropertyRowMapper<>(VideosDTO.class));
   // databaseReader.open(new ExecutionContext());
   ExecutionContext executionContext= new ExecutionContext();
   executionContext.size();
   databaseReader.open(executionContext);

    return databaseReader;
}

Процесс обработки моего элемента выглядит следующим образом:

@Override
public VideosDTO process(VideosDTO videosDTO) throws Exception {
    log.info("processing........" + videosDTO.getVideoUrl());

    try {
        Process p = Runtime.getRuntime()
                .exec("C:\\Program Files\\Git\\bin\\bash.exe " + "D:\\DRM\\script.sh " + videosDTO.getVideoUrl());
        // .exec("D:\\PortableGit\\bin\\bash.exe
        // D:\\Vocabimate_Files\\script.sh "+videosDTO.getVideoUrl());
        // Thread.sleep(1000);
        Thread.sleep(1000);
        p.destroy();
        try {
            p.waitFor();
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        try (InputStream is = p.getErrorStream()) {
            int in = -1;
            while ((in = is.read()) != -1) {
                System.out.print((char) in);
            }
        }
        try (InputStream is = p.getInputStream()) {
            int in = -1;
            while ((in = is.read()) != -1) {
                System.out.print((char) in);
            }
        }
    } catch (IOException e2) {
        // TODO Auto-generated catch block
        e2.printStackTrace();
    }

    return videosDTO;
}

Писатель работает следующим образом:

    @Override
public void write(List<? extends VideosDTO>videosList) throws Exception {

    for(VideosDTO vid:videosList){
        log.info("writting...."+vid.getVideoUrl());
    }

}

Предположим, есть 3 объекта, выбранных изБД этот код сначала завершает процесс на первом объекте, затем на втором и третьем, чем начинается запись. Я хочу одновременно запустить процесс на трех объектах, чем выполнять операцию записи.

Есть ли какие-либоспособ сделать это?

Ответы [ 2 ]

0 голосов
/ 29 октября 2018

Использование многопоточного шага, как предложено @dimitrisli, - это путь.В дополнение к этому, другим способом является использование AsyncItemProcessor (в сочетании с AsyncItemWriter).

Аналогичный вариант использования (асинхронный вызов конечной точки покоя из процессора)можно найти здесь: https://stackoverflow.com/a/52309260/5019386, где я привел некоторые подробности.

Надеюсь, это поможет.

0 голосов
/ 28 октября 2018

Не вдаваясь в детали вашего пользовательского устройства чтения / обработки / записи, я думаю, что вы ищете многопоточный шаг .

Как также описано вышесвязанная документация, чтобы сделать ваш шаг многопоточным (то есть чтение / обработка / запись каждого чанка в отдельном потоке), вам сначала нужно зарегистрировать SimpleAsyncTaskExecutor:

@Bean
public TaskExecutor taskExecutor(){
    return new SimpleAsyncTaskExecutor("myAsyncTaskExecutor");
}

, а затем зарегистрировать этого исполнителя задачв строителе вашего шага:

@Bean
public Step step1() {
    return stepBuilderFactory.get("step1")
            .<VideosDTO, VideosDTO>chunk(3)
            .reader(databaseVideoItemReader(null))
            .processor(new Processor())
            .writer(new Writer(videoDao))
            //making the Step multi-threaded
            .taskExecutor(taskExecutor())
            .build();
}
...