Spring Batch - Чанкинг и многопоточные шаги - исключение Nullpointer в RowMapper - PullRequest
0 голосов
/ 25 сентября 2019

Когда я выполняю свой шаг в нескольких потоках, я получаю исключение нулевого указателя внутри моего средства отображения строк при обработке результирующего набора даже для записей, которые имеют явную проверку нуля.Работает нормально, когда я выполняю его без taskExecutor() / на одном потоке.Я запутался в нескольких вещах.Насколько я понимаю, если я укажу интервал фиксации как 100, а количество основных потоков - как 10, каждый поток тянет кусок по 100 каждый и работает с ним независимо.

  • Как работает chunking-reader-rowкартографическое трио работает?Если в моем ридере есть запрос, извлекающий 1 миллион строк и размер фрагмента 1000, значит ли это, что ридер попадет в базу данных 1000 раз?И после каждого раза, отображение строки будет отображать все 1000 выбранных строк? Как многопоточность влияет на отображение строк?

Код, как показано ниже:

@Bean
public Step myStep() {
    return stepBuilderFactory.get(STEP_NAME).<MyModel, MyModel> chunk(1000)
            .reader(myModelReader())
            .writer(myModelWriter())
            .taskExecutor(taskExecutor())
            .listener(stepExecutionNotificationListener)
            .listener(chunkExecutionListener)
            .build();

}

@Bean
public Job myJob() {
    return jobBuilderFactory.get(JOB_NAME)
            .incrementer(new RunIdIncrementer())
            .listener(jobCompletionNotificationListener)
            .flow(myStep()).end().build();

}

@Bean
@StepScope
public JdbcCursorItemReader<MyModel> myModelReader(){
    JdbcCursorItemReader<MyModel> reader = new JdbcCursorItemReader<>();
    reader.setDataSource(dataSource);
    reader.setVerifyCursorPosition(false);
    reader.setSql("my query fetching millions of records joining multiple tables from the db");
    reader.setRowMapper(new MyModelRowMapper());

    return reader;
}

public class MyModelRowMapperimplements RowMapper<MyModel>{

    @Override
    public MyModel mapRow(ResultSet rs, int rowNum) throws SQLException {
      MyModel myModel = new MyModel();
      myModel.setEmailAddress(checkIsEmpty(rs.getString("EMAIL_ADDRESS")) ? "" : rs.getString("EMAIL_ADDRESS").replace("|", "")); // ----- The line which is failing!!! -----
      return person;             
    }

}
public boolean checkIsEmpty(String stringToCheck)
{
    if(stringToCheck==null || stringToCheck.isEmpty() || stringToCheck.equals("null"))
    {
        return true;
    }
    return false;
}
public TaskExecutor taskExecutor(){
    ThreadPoolTaskExecutor threadPoolTaskExecutor=new ThreadPoolTaskExecutor();
    threadPoolTaskExecutor.setCorePoolSize(10);
    threadPoolTaskExecutor.setMaxPoolSize(25);
    threadPoolTaskExecutor.setQueueCapacity(5);
    threadPoolTaskExecutor.setThreadNamePrefix("MyModelBatch-");
    threadPoolTaskExecutor.afterPropertiesSet();
    return threadPoolTaskExecutor;
}

Редактировать 1

Отдельноот работы в непотоковом контексте, это также работает, если я использую набор результатов один раз.Я изменил свой код как

String email = rs.getString("EMAIL_ADDRESS");
myModel.setEmailAddress(checkIsEmpty(email) ? "" : email.replace("|", ""));

Ответы [ 2 ]

1 голос
/ 25 сентября 2019

JdbcCursorItemReader не является потокобезопасным (см. Его javadoc и более подробную информацию в этом ответе ).Причина этого заключается в том, что он оборачивает одну ResultSet, которая не является поточно-ориентированной.

Таким образом, ваша проблема связана с использованием считывающего устройства, не поддерживающего потоки, в многопоточном шаге.Согласно Javadoc:

Каждый вызов read() будет вызывать предоставленный RowMapper, передавая ResultSet.

Поскольку read не синхронизирован, каждый потокможете вызвать его для чтения элементов.

Чтобы исправить проблему, вы можете обернуть свой Jdbc-ридер в SynchronizedItemStreamReader.

0 голосов
/ 25 сентября 2019

Поскольку обработка ResultSet является последовательной операцией, и она выполняется вашим RowMapper внутри JdbcCursorItemReader, не должно быть никакой возможности вмешательства потока (код довольно прост, если вы посмотрите на него).: read row -> map row to obj -> return obj).

ResultSet также не должны возвращать различные значения для повторных вызовов на getXXX, хотя это, вероятно, не гарантируется (хотя это будетстранная реализация, и я никогда не слышал о драйвере, который бы это делал).

Таким образом, чтобы вы могли получить описанную ошибку, ResultSet должно быть разделено между двумя JdbcCursorItemReaders, но я действительно не вижу, как это могло случиться, темы или нет темы.Тогда вы можете получить ситуацию, когда набор результатов уже продвинут, но опять же ... они не могут поделиться ResultSets.

Это может быть некоторой проблемой конфигурации, но я не сделалВесенняя партия через некоторое время, так что не могу сказать, от макушки моей головы.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...