Пакет Spring - JdbcPagingItemReader, SQLServerException: недопустимое имя столбца XYZ - PullRequest
0 голосов
/ 14 июля 2020

У меня есть задание, которое читает из SQL списка документов базы данных сервера. Документы должны быть в каком-то статусе и отсортированы по столбцу status_updated_time. Я хочу прочитать document.id, а затем обработать его в обработчике заданий как Driving Query Based ItemReaders .
Статус столбца изменен в Writer, поэтому я не могу использовать JpaPagingItemReader из-за эта проблема .
Я использовал JdbcPagingItemReader, но получил ошибку при сортировке по status_updated_time. Затем я попытался добавить и id к сортировке, но это не помогло.
Запрос, который я хочу получить:

SELECT id 
FROM document 
WHERE status IN (0, 1, 2)
ORDER BY status_updated_time ASC, id ASC 

Мой читатель:

@StepScope
@Bean
private ItemReader<Long> statusReader() {
JdbcPagingItemReader<Long> reader = new JdbcPagingItemReader<>();
...
reader.setRowMapper(SingleColumnRowMapper.newInstance(Long.class));
...

Map<String, Order> sortKeys = new HashMap<>();
sortKeys.put("status_updated_time", Order.ASCENDING);
sortKeys.put("id", Order.ASCENDING);

SqlServerPagingQueryProvider queryProvider = new SqlServerPagingQueryProvider();
queryProvider.setSelectClause(SELECT_CLAUSE);
queryProvider.setFromClause(FROM_CLAUSE);
queryProvider.setWhereClause(WHERE_CLAUSE);
queryProvider.setSortKeys(sortKeys);

reader.setQueryProvider(queryProvider);
...
return reader;
}

Где константы:

private static final String SELECT_CLAUSE = "id";
private static final String FROM_CLAUSE = "document";
private static final String WHERE_CLAUSE = "status IN (0, 1, 2) ";

Когда задание выполняется, я получаю сообщение об ошибке:

org.springframework.dao.TransientDataAccessResourceException: StatementCallback; SQL [SELECT TOP 10 id FROM document WHERE status IN (0, 1, 2) ORDER BY id ASC, status_updated_time ASC]; The column name status_updated_time is not valid.; nested exception is com.microsoft.sqlserver.jdbc.SQLServerException: The column name status_updated_time is not valid.
at org.springframework.jdbc.support.SQLStateSQLExceptionTranslator.doTranslate(SQLStateSQLExceptionTranslator.java:110)
at org.springframework.jdbc.support.AbstractFallbackSQLExceptionTranslator.translate(AbstractFallbackSQLExceptionTranslator.java:72)
at org.springframework.jdbc.support.AbstractFallbackSQLExceptionTranslator.translate(AbstractFallbackSQLExceptionTranslator.java:81)
at org.springframework.jdbc.support.AbstractFallbackSQLExceptionTranslator.translate(AbstractFallbackSQLExceptionTranslator.java:81)
at org.springframework.jdbc.core.JdbcTemplate.translateException(JdbcTemplate.java:1443)
at org.springframework.jdbc.core.JdbcTemplate.execute(JdbcTemplate.java:388)
at org.springframework.jdbc.core.JdbcTemplate.query(JdbcTemplate.java:452)
at org.springframework.jdbc.core.JdbcTemplate.query(JdbcTemplate.java:462)
at org.springframework.batch.item.database.JdbcPagingItemReader.doReadPage(JdbcPagingItemReader.java:210)
at org.springframework.batch.item.database.AbstractPagingItemReader.doRead(AbstractPagingItemReader.java:108)
at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader.read(AbstractItemCountingItemStreamItemReader.java:92)
at org.springframework.batch.core.step.item.SimpleChunkProvider.doRead(SimpleChunkProvider.java:94)
at org.springframework.batch.core.step.item.FaultTolerantChunkProvider.read(FaultTolerantChunkProvider.java:87)
at org.springframework.batch.core.step.item.SimpleChunkProvider$1.doInIteration(SimpleChunkProvider.java:119)

Я видел вопрос относительно The column name XYZ is not valid при переполнении стека ( это ...), но не видел ничего, что работало бы в моем случае, когда мне нужно отсортировать по другому столбцу.
Другая проблема - сортировка столбцов.
Неважно, добавляю ли я сначала status_updated_time или id для сортировки карты в сгенерированном скрипте всегда ORDER BY id ASC, status_updated_time ASC.

РЕДАКТИРОВАТЬ: Чтение этого вопроса , особенно этой строки:

JdbcPagingItemReader здесь предполагает, что ключ сортировки и столбец в предложении select вызываются одинаково

Я понял, что мне нужен столбец status_updated_time в наборе результатов, поэтому я провел рефакторинг:

private static final String SELECT_CLAUSE = "id, status_updated_time";
...
queryProvider.setSelectClause(SELECT_CLAUSE);
...
reader.setRowMapper(
    (rs, i) -> {
      Document document = new Document();
      document.setId(rs.getLong(1));
      document.setStatusUpdatedTime(rs.getObject(2, Timestamp.class));
      return document;
    }
);

Теперь приложение может компилироваться и задание может выполняться.

Но проблема с сортировкой осталась прежней. Я не могу заказать сначала status_updated_time, а затем id. id всегда идет первым.
Я попытался удалить id из сортировки и столкнулся с другой проблемой. На тестовом env. Мне нужно было обработать 1600 строк. Строка моего рабочего процесса и обновление status_updated_time до now(). Когда задание начало обработку, он не остановился на 1600, а продолжил обработку, потому что каждая строка получила новую status_updated_time, и читатель считает ее новой строкой и продолжал обрабатывать бесконечно.
При сортировке только по id задание обработало 1600 строк а затем остановился.
Так что, похоже, я не могу использовать JdbcPagingItemReader из-за проблемы с сортировкой.
И мне нужен был какой-нибудь считыватель, который мог бы работать параллельно, чтобы ускорить это задание (он выполняется примерно 20 минут каждый час через день).
Есть предложения?

1 Ответ

0 голосов
/ 27 июля 2020

Я хочу поблагодарить Махмуда за то, что он отслеживал вопрос о Spring Batch и пытался помочь. Но его предложение мне не помогло, поэтому я применил другой подход. Я использовал временную (вспомогательную) таблицу для подготовки данных для выполнения основного шага, и на основном шаге читатель читает из этой таблицы.

Первый шаг сбросит справочную таблицу:

@Bean
private Step dropHelpTable() {
  return stepBuilderFactory
    .get(STEP_DROP_HELP_TABLE)
    .transactionManager(cronTransactionManager)
    .tasklet(dropHelpTableTasklet())
    .build();
}

private Tasklet dropHelpTableTasklet() {
  return (contribution, chunkContext) -> {
    jdbcTemplate.execute(DROP_SCRIPT);
    return RepeatStatus.FINISHED;
  };
}

private static final String STEP_DROP_HELP_TABLE = "dropHelpTable";
private static final String DROP_SCRIPT = "IF EXISTS (SELECT 1 FROM INFORMATION_SCHEMA.TABLES "
  + "WHERE TABLE_NAME = 'query_document_helper') "
  + "BEGIN "
  + " DROP TABLE query_document_helper "
  + "END";

Второй шаг подготовим данные. Вставьте идентификатор документа, который будет обработан позже:

@Bean
private Step insertDataToHelpTable() {
  return stepBuilderFactory
    .get(STEP_INSERT_HELP_TABLE)
    .transactionManager(cronTransactionManager)
    .tasklet(insertDataToHelpTableTasklet())
    .build();
}

private Tasklet insertDataToHelpTableTasklet() {
  return (contribution, chunkContext) -> {
    jdbcTemplate.execute("SELECT TOP " + limit + " id " + INSERT_SCRIPT);
    return RepeatStatus.FINISHED;
  };
}

private static final String STEP_INSERT_HELP_TABLE = "insertHelpTable";
private static final String INSERT_SCRIPT = "INTO query_document_helper "
  + "FROM dbo.document "
  + "WHERE status IN (0, 1, 2) "
  + "ORDER BY status_updated_time ASC";

@Value("${cron.batchjob.queryDocument.limit}")
private Integer limit;

После этого у меня есть все данные, которые будут использоваться при выполнении одного задания, поэтому упорядочивание по status_updated_time больше не требуется (условие не было для обработки самого молодого документа при выполнении этого задания, но при более позднем выполнении, когда они станут старейшими). На следующем шаге я использую обычный ридер.

@Bean
private Step queryDocumentStep() {
  return stepBuilderFactory
    .get(STEP_QUERY_NEW_DOCUMENT_STATUS)
    .transactionManager(cronTransactionManager)
    .<Long, Document>chunk(chunk)
    .reader(documentReader())
    ...
    .taskExecutor(multiThreadingTaskExecutor.threadPoolTaskExecutor())
    .build();
}

@StepScope
@Bean
private ItemReader<Long> documentReader() {
  JdbcPagingItemReader<Long> reader = new JdbcPagingItemReader<>();
  reader.setDataSource(coreBatchDataSource);
  reader.setMaxItemCount(limit);
  reader.setPageSize(chunk);
  ...
  Map<String, Order> sortKeys = new HashMap<>();
  sortKeys.put("id", Order.ASCENDING);

  SqlServerPagingQueryProvider queryProvider = new SqlServerPagingQueryProvider();
  queryProvider.setSelectClause(SELECT_CLAUSE);
  queryProvider.setFromClause(FROM_CLAUSE);
  queryProvider.setSortKeys(sortKeys);

  reader.setQueryProvider(queryProvider);
  ...
  return reader;
}

private static final String STEP_QUERY_NEW_DOCUMENT_STATUS = "queryNewDocumentStatus";
private static final String SELECT_CLAUSE = "id";
private static final String FROM_CLAUSE = "query_archive_document_helper";

И работа выглядит так:

@Bean
public Job queryDocumentJob() {
return jobBuilderFactory
    .get(JOB_QUERY_DOCUMENT)
    .incrementer(new RunIdIncrementer())
    .start(dropHelpTable())
    .next(insertDataToHelpTable())
    .next(queryDocumentStep())
    .build();
}

private static final String JOB_QUERY_DOCUMENT = "queryDocument";
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...