весенняя партия не обрабатывает все записи - PullRequest
0 голосов
/ 16 октября 2019

Я использую Spring Batch для чтения записей из базы данных postgresql с помощью RepositoryItemReader, а затем записать его в тему. Я вижу, что было обработано около 1 миллиона записей, но не все записи были обработаны. Я установил pageSize для читателя равным 10 000 и таким же, как интервал фиксации (размер фрагмента)

@Bean
public TaskletStep broadcastProductsStep(){
    return stepBuilderFactory.get("broadcastProducts")
            .<Product, Product> chunk(10000)
            .reader(productsReader.repositoryItemReader())
            .processor(productsProcessor)
            .writer(compositeItemWriter)                    
            .faultTolerant()
            .skip(Exception.class)                              
            .skipLimit(100000)
            .processorNonTransactional()                        
            .listener(new SkipListenerProducts())               
            .listener(productsChunkListener)
            .build();
}


@Bean
public RepositoryItemReader repositoryItemReader() {

    RepositoryItemReader<Product> repositoryReader = new RepositoryItemReader<>();

    try {
        repositoryReader.setRepository(skuRepository);
        repositoryReader.setMethodName("findByIsUpdatedAndStatusCodeIn");
        repositoryReader.setPageSize(10000);
        repositoryReader.setSaveState(false);

        List<List<String>> arguments = new ArrayList<>();
        arguments.add(Stream.of(SkuStatus.RELEASED.getValue().toString(), SkuStatus.BLOCKED.getValue().toString(),
                SkuStatus.DISCONTINUED.getValue().toString())
                  .collect(Collectors.toList()));
        repositoryReader.setArguments(arguments);

        Map sorts = new HashMap();
        sorts.put("catalog_number", Sort.Direction.ASC);

        repositoryReader.setSort(sorts);
        repositoryReader.afterPropertiesSet();

    } catch (Exception exception){
        exception.printStackTrace();
    }

    return repositoryReader;
}

@Query(value = "SELECT * FROM CATALOG.PRODUCTS WHERE IS_UPDATED = 'true' AND STATUS_CODE IN (:statusCode)",
       countQuery = "SELECT COUNT(*) FROM CATALOG.PRODUCTS WHERE IS_UPDATED = 'true' AND STATUS_CODE IN (:statusCode)",
       nativeQuery = true)
public Page<Product> findByIsUpdatedAndStatusCodeIn(@Param(value = "statusCode") List<String> statusCode, 
        Pageable pageable);

1 Ответ

0 голосов
/ 18 октября 2019

Возможно, проблема в том, что вы смешиваете нумерацию страниц и обновляете критерии запроса чтения (IS_UPDATED).

Пример с размером страницы = 2 и 6 строками в дБ

  • A IS_UPDATED = true
  • B IS_UPDATED = true
  • C IS_UPDATED = истина
  • D IS_UPDATED = истина
  • E IS_UPDATED = истина
  • F IS_UPDATED = true

Первая прочитанная страница = 1 возвращаемые строки A и B

После выполнения записи (установите IS_UPDATED в false для A & B), мы имеем в db:

  • C IS_UPDATED = true
  • D IS_UPDATED = true
  • E IS_UPDATED = true
  • F IS_UPDATED = true

Второе чтение переместится на страницу 2 , поэтому оно займет строку E & F , а не C & D

Либо:

  1. вы не должны обновлять столбец IS_UPDATED.
  2. Или вы создаете подкласс RepositoryItemReader, где вы переопределяете getPage
    @Override
    public int getPage() {
        return 0;
    }

Опция 2 более устойчива к пакетному аварийному завершению / ошибке, но вы должны убедиться, что IS_UPDATEDвсегда устанавливайте в записывающем устройстве значение false, иначе читатель будет бесконечно зацикливаться.

Вариант 2 также потребует дополнительной настройки, если вы используете многопоточный шаг.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...