Как повторить работу с Partitioner, когда данные динамические с Spring Batch? - PullRequest
0 голосов
/ 22 апреля 2019

Я пытаюсь разработать пакетный процесс, используя Spring Batch + Spring Boot (конфигурация Java), но у меня проблема с этим.У меня есть программное обеспечение, которое имеет базу данных и API Java, и я читаю записи оттуда.Пакетный процесс должен извлечь все документы, срок действия которых меньше определенной даты, обновить дату и снова сохранить их в той же базе данных.

Мой первый подход - чтение записей 100 на 100;поэтому ItemReader извлекает 100 записей, я обрабатываю их 1 на 1 и, наконец, пишу их снова.В считыватель я поместил этот код:

public class DocumentItemReader implements ItemReader<Document> {

    public List<Document> documents = new ArrayList<>();

    @Override
    public Document read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {

        if(documents.isEmpty()) {
            getDocuments(); // This method retrieve 100 documents and store them in "documents" list.
            if(documents.isEmpty()) return null;
        }

        Document doc = documents.get(0);
        documents.remove(0);
        return doc;
    }
}

Итак, с этим кодом считыватель читает из базы данных, пока записи не найдены.Когда метод «getDocuments ()» не извлекает никаких документов, список пуст, и читатель возвращает ноль (поэтому задание завершено).Здесь все работало нормально.

Однако проблема возникает, если я хочу использовать несколько потоков.В этом случае я начал использовать подход Partitioner вместо многопоточности.Причина в том, что я читаю из одной и той же базы данных, поэтому, если я повторю полный шаг с несколькими потоками, все они найдут одинаковые записи, и я не смогу использовать нумерацию страниц (см. Ниже).

Другая проблема заключается в том, что записи базы данных обновляются динамически, поэтому я не могу использовать нумерацию страниц.Например, предположим, у меня есть 200 записей, и все они скоро истекают, поэтому процесс их извлечет.Теперь представьте, что я получаю 10 с одним потоком, и прежде всего этот поток обрабатывает один и обновляет его в той же базе данных.Следующий поток не может получить от 11 до 20 записей, так как первая запись не будет появляться в поиске (поскольку она была обработана, ее дата была обновлена, а затем она не соответствует запросу).

Это немного сложно понять, и некоторые вещи могут показаться странными, но в моем проекте:

  • Я вынужден использовать одну и ту же базу данных для чтения и записи.
  • У меня могут быть миллионы документов, поэтому я не могу прочитать все записи одновременно.Мне нужно прочитать их 100 на 100 или 500 на 500.
  • Мне нужно использовать несколько потоков.
  • Я не могу использовать нумерацию страниц, поскольку запрос к базе данных будет каждый раз получать разные документыоно выполняется.

Итак, после долгих раздумий, я думаю, единственное возможное решение - повторять задание, пока запрос не получит никаких документов.Это возможно?Я хочу сделать что-то вроде шага: Делайте что-то, пока не будет возвращено значение null - повторяйте задание, пока запрос не вернет ноль записей.

Если это не очень хороший подход, я буду признателен за другие возможные решения.

Спасибо.

1 Ответ

0 голосов
/ 23 апреля 2019

Возможно, вы можете добавить разделитель к вашему шагу, который будет:

  1. Выбрать все идентификаторы данных, которые необходимо обновить (и другие столбцы, если необходимо)
  2. Разделитьих в разделах x (x = gridSize) и записать их во временный файл (1 по разделам).
  3. Зарегистрировать имя файла для чтения в executeContext

Тогда ваш читатель нечтение из базы данных больше, но не из секционированного файла.

Кажется сложным, но это не так уж много, вот пример, который обрабатывает миллионы записей, используя запрос JDBC, но его можно легко перенести для вашего варианта использования:

public class JdbcToFilePartitioner implements Partitioner {

    /** number of records by database fetch  */
    private int fetchSize = 100;

    /** working directory */
    private File tmpDir;

    /** limit the number of item to select */
    private Long nbItemMax;

    @Override
    public Map<String, ExecutionContext> partition(final int gridSize) {

        // Create contexts for each parttion
        Map<String, ExecutionContext> executionsContexte = createExecutionsContext(gridSize);

        // Fill partition with ids to handle
        getIdsAndFillPartitionFiles(executionsContexte);

        return executionsContexte;
    }

    /**
     * @param gridSize number of partitions
     * @return map of execution context, one for each partition
     */
    private Map<String, ExecutionContext> createExecutionsContext(final int gridSize) {

        final Map<String, ExecutionContext> map = new HashMap<>();

        for (int partitionId = 0; partitionId < gridSize; partitionId++) {
            map.put(String.valueOf(partitionId), createContext(partitionId));
        }

        return map;
    }

    /**
     * @param partitionId id of the partition to create context
     * @return created executionContext
     */
    private ExecutionContext createContext(final int partitionId) {

        final ExecutionContext context = new ExecutionContext();

        String fileName = tmpDir + File.separator + "partition_" + partitionId + ".txt";

        context.put(PartitionerConstantes.ID_GRID.getCode(), partitionId);
        context.put(PartitionerConstantes.FILE_NAME.getCode(), fileName);

        if (contextParameters != null) {
            for (Entry<String, Object> entry : contextParameters.entrySet()) {
                context.put(entry.getKey(), entry.getValue());
            }
        }

        return context;
    }

    private void getIdsAndFillPartitionFiles(final Map<String, ExecutionContext> executionsContexte) {

        List<BufferedWriter> fileWriters = new ArrayList<>();
        try {

            // BufferedWriter for each partition
            for (int i = 0; i < executionsContexte.size(); i++) {
                BufferedWriter bufferedWriter = new BufferedWriter(new FileWriter(executionsContexte.get(String.valueOf(i)).getString(
                        PartitionerConstantes.FILE_NAME.getCode())));
                fileWriters.add(bufferedWriter);
            }

            // Fetching the datas
            ScrollableResults results = runQuery();

            // Get the result and fill the files
            int currentPartition = 0;
            int nbWriting = 0;
            while (results.next()) {
                fileWriters.get(currentPartition).write(results.get(0).toString());
                fileWriters.get(currentPartition).newLine();
                currentPartition++;
                nbWriting++;

                // If we already write on all partitions, we start again
                if (currentPartition >= executionsContexte.size()) {
                    currentPartition = 0;
                }

                // If we reach the max item to read we stop
                if (nbItemMax != null && nbItemMax != 0 && nbWriting >= nbItemMax) {
                    break;
                }
            }

            // closing
            results.close();
            session.close();
            for (BufferedWriter bufferedWriter : fileWriters) {
                bufferedWriter.close();
            }
        } catch (IOException | SQLException e) {
            throw new UnexpectedJobExecutionException("Error writing partition file", e);
        }
    }

    private ScrollableResults runQuery() {
        ...
    }
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...