Весенняя партия Parrall step - PullRequest
0 голосов
/ 19 ноября 2018

У меня есть подпружиненное задание, в котором мне нужно передать в качестве входных данных задание список идентификаторов, я бы хотел, чтобы из этого списка идентификаторов можно было перейти к шагу, который мог бы выполнять все из них параллельно,На данный момент я выполнил несколько экземпляров задания в threadpoolExecutor, который выполняет задание x количество раз.Это подразумевает, что он выполняет одиночные запросы для всех заданий.И мы говорим о более чем 50 миллионах записей.Записи представляют временную серию @ конкретный день потребления.Мне нужно агрегировать id и batchId по месяцам и отправлять эту информацию брокеру.

  • Reader -> считывает данные из базы данных в соответствии с id и временными метками, представляющими временной ряд.
  • Процессор -> PassThroughItemProcessor
  • Writer -> Отправить в AMQP (агрегирует список элементов)

Есть ли какие-либо рекомендации, которые вы могли бы предоставить мне?


Согласно предложениям, так выглядит мой разделитель;

@Override
public Map<String, ExecutionContext> partition(int gridSize) {
    log.debug("START: Partition");

    Map<String, ExecutionContext> partitionMap = new HashMap<>();
    final AtomicInteger counter = new AtomicInteger(0);
    final AtomicInteger partitionerCounter = new AtomicInteger(0);
    Page<Integer> result = null;
    do {
        result = repository.findDistinctByBatchId(LocalDateTime.parse(batchId, AipForecastService.DEFAULT_DATE_TIME_FORMATTER), Optional.ofNullable(result)
                .map(Page::nextPageable)
                .orElse(PageRequest.of(0, 100000)));
        result
                .stream()
                .collect(Collectors.groupingBy(it -> counter.getAndIncrement() / 100))
                .values()
                .forEach(listOfInstallation -> {
                    ExecutionContext context = new ExecutionContext();
                    context.put("listOfInstallation", listOfInstallation);
                    partitionMap.put("partition" + partitionerCounter.incrementAndGet(), context);
                    log.debug("Adding to the partition map {}, listOfInstallation {}", partitionerCounter.get(), listOfInstallation);
                });
    } while (result.hasNext());

    log.debug("END: Created Partitions for installation job of size:{}", partitionMap.size());
    return partitionMap;
}

1 Ответ

0 голосов
/ 20 ноября 2018

мне нужно передать в качестве входных данных для задания список идентификаторов, я бы хотел, чтобы из этого списка идентификаторов можно было перейти к шагу, который мог бы запустить все из них параллельно

Вы можете разделить этот список и использовать разделенный шаг для параллельной обработки разделов.

Есть ли какие-либо рекомендации, которые вы могли бы мне предоставить?

Если вы выбираете разделенный шаговый маршрут (который мне подходит для вашего варианта использования), я бы рекомендовал не создавать раздел для идентификатора (если у вас нет разумного количества идентификаторов).Например, вы можете создать раздел для каждого диапазона идентификаторов и заставить каждый рабочий шаг выполнять логику чтения / обработки / записи, которую вы описали, что, безусловно, может выполняться параллельно.

Надеюсь, это поможет.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...