Запустить массовую операцию как операцию промежуточного потока - PullRequest
0 голосов
/ 25 мая 2018

У меня есть поток Java неопределенной длины.Теперь мне нужно загрузить метаданные из базы данных и назначить их потоковым данным.

Я не могу:

  • загрузить все данные из потока в мою оперативную память сразу, заполнить метаданные и затем запустить новый поток, так как это может использовать много оперативной памяти.
  • загружайте метаданные для каждого элемента индивидуально, так как это приведет к переполнению моей базы данных слишком большим количеством запросов.

Таким образом, я подумал, что могу загрузить метаданные в разделах из базы данных.

Мне нужен такой метод:

<T> Stream<List<T>> partition(Stream<T> stream, int partitionSize)

, поэтому я могу использовать его следующим образом:

partition(dataSource.stream(), 1000)
    .map(metadataSource::populate)
    .flatMap(List::stream)
    .forEach(this::doSomething);

Я уже нашел Iteralbes # раздел Guava но это заставило бы меня преобразовать поток в итеративный, разделить его и снова преобразовать в поток.Есть ли что-то встроенное для разделения потока или есть простой способ реализовать это самостоятельно?

1 Ответ

0 голосов
/ 25 мая 2018

Я не нашел уже существующего метода, который бы это делал, поэтому я сам его реализовал:

public class Partitioner<E> implements Iterator<List<E>> {

    private final Iterator<E> iterator;
    private final int partitionSize;

    public static <T> Stream<List<T>> partition(final Stream<T> stream, final int partitionSize) {
        return new Partitioner<>(stream, partitionSize).asStream();
    }

    public Partitioner(final Stream<E> stream, final int partitionSize) {
        this(stream.iterator(), partitionSize);
    }

    public Partitioner(final Iterator<E> iterator, final int partitionSize) {
        this.iterator = iterator;
        this.partitionSize = partitionSize;
    }

    @Override
    public boolean hasNext() {
        return this.iterator.hasNext();
    }

    @Override
    public List<E> next() {
        if (!hasNext()) {
            throw new NoSuchElementException("No more elements");
        }
        final ArrayList<E> result = new ArrayList<>(this.partitionSize);
        for (int i = 0; i < this.partitionSize && hasNext(); i++) {
            result.add(this.iterator.next());
        }
        return result;
    }

    public Stream<List<E>> asStream() {
        return StreamSupport.stream(Spliterators.spliteratorUnknownSize(this, Spliterator.NONNULL), false);
    }

}
...