Как вы группируете потоки в наборах заданного размера? - PullRequest
1 голос
/ 13 апреля 2020

У меня есть поток данных из базы данных, и я хочу использовать Java streams API для итерации и выбрать X их количество и вывести его в другой поток. Порядок не имеет значения .

Что-то вроде

source
.collect(x -> collectNItems(10)) // basically from the stream choose 10 items
.flatmap(collectedItems -> Stream.of(collectedItems))
.map(x -> buildElasticSearchBatchInsertRequest(x))
.forEach(request -> insertToElasticSearch(request));

говорит, что источник 1,2,3,4,5,6,7,8,9,10,11,12,13,14

После flatMap я должен получить поток из

(12, 1, 14, 3)
(2, 4, 5 , 8, 7, 6, 10, 11, 9, 13)

(опять-таки порядок не имеет значения) Мне просто нужно, чтобы он был сгруппирован вместе.

Основным вариантом использования для этого было массовое вставление в Elasticsearch на основе данных базы данных, как единое целое. в то время как вставки выполняются медленно, а массовая вставка целиком занимает много памяти.

1 Ответ

1 голос
/ 14 апреля 2020

Найден один, похороненный в https://code-examples.net/en/q/1d38ce7

Для использования

@Test
public void try2() {
  Stream<Integer> stream = Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14);
  BatchSpliterator.decorate(stream, 10)
    .forEach(System.out::println);
}

И в декораторе BatchSpliterator, только немного подправлен из исходной ссылки, так как предполагаемый размер должен возвращаться MAX_VALUE, если неизвестен

public class BatchSpliterator<E> implements Spliterator<List<E>> {

    public static <E> Stream<List<E>> decorate(Stream<E> originalStream, int batchSize) {
        return StreamSupport.stream(new BatchSpliterator<>(originalStream.spliterator(), batchSize), originalStream.isParallel());
    }

    private final Spliterator<E> base;

    private final int batchSize;

    private BatchSpliterator(Spliterator<E> base, int batchSize) {
        this.base = base;
        this.batchSize = batchSize;
    }

    @Override
    public boolean tryAdvance(Consumer<? super List<E>> action) {
        final List<E> batch = new ArrayList<>(batchSize);
        for (int i = 0; i < batchSize; i++) {
            base.tryAdvance(batch::add);
        }
        if (batch.isEmpty()) {
            return false;
        }
        action.accept(batch);
        return true;
    }

    @Override
    public Spliterator<List<E>> trySplit() {
        if (base.estimateSize() <= batchSize)
            return null;
        final Spliterator<E> splitBase = this.base.trySplit();
        return splitBase==null ? null
            :new BatchSpliterator<>(splitBase, batchSize);
    }

    @Override
    public long estimateSize() {
        final long baseSize = base.estimateSize();
        return baseSize==Long.MAX_VALUE ? baseSize
            :(long) Math.ceil(baseSize / (double) batchSize);
    }

    @Override
    public int characteristics() {
        return base.characteristics();
    }

    @Override
    public boolean hasCharacteristics(int characteristics) {
        return base.hasCharacteristics(characteristics);
    }

    @Override
    public Comparator<? super List<E>> getComparator() {
        throw new UnsupportedOperationException("getComparator");
    }
}
...