У меня есть подпружиненное задание, в котором мне нужно передать в качестве входных данных задание список идентификаторов, я бы хотел, чтобы из этого списка идентификаторов можно было перейти к шагу, который мог бы выполнять все из них параллельно,На данный момент я выполнил несколько экземпляров задания в threadpoolExecutor, который выполняет задание x количество раз.Это подразумевает, что он выполняет одиночные запросы для всех заданий.И мы говорим о более чем 50 миллионах записей.Записи представляют временную серию @ конкретный день потребления.Мне нужно агрегировать id и batchId по месяцам и отправлять эту информацию брокеру.
- Reader -> считывает данные из базы данных в соответствии с id и временными метками, представляющими временной ряд.
- Процессор -> PassThroughItemProcessor
- Writer -> Отправить в AMQP (агрегирует список элементов)
Есть ли какие-либо рекомендации, которые вы могли бы предоставить мне?
Согласно предложениям, так выглядит мой разделитель;
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
log.debug("START: Partition");
Map<String, ExecutionContext> partitionMap = new HashMap<>();
final AtomicInteger counter = new AtomicInteger(0);
final AtomicInteger partitionerCounter = new AtomicInteger(0);
Page<Integer> result = null;
do {
result = repository.findDistinctByBatchId(LocalDateTime.parse(batchId, AipForecastService.DEFAULT_DATE_TIME_FORMATTER), Optional.ofNullable(result)
.map(Page::nextPageable)
.orElse(PageRequest.of(0, 100000)));
result
.stream()
.collect(Collectors.groupingBy(it -> counter.getAndIncrement() / 100))
.values()
.forEach(listOfInstallation -> {
ExecutionContext context = new ExecutionContext();
context.put("listOfInstallation", listOfInstallation);
partitionMap.put("partition" + partitionerCounter.incrementAndGet(), context);
log.debug("Adding to the partition map {}, listOfInstallation {}", partitionerCounter.get(), listOfInstallation);
});
} while (result.hasNext());
log.debug("END: Created Partitions for installation job of size:{}", partitionMap.size());
return partitionMap;
}