Java Streams - Буферизация огромных потоков - PullRequest
4 голосов
/ 09 апреля 2020

Я пытаюсь объединить несколько потоков, подкрепленных огромными объемами данных, в один, а затем буферизировать их. Я могу без проблем объединить эти потоки в один поток элементов. Однако когда я пытаюсь буферизовать / разделить потоки на потоки, он пытается полностью буферизовать первый поток, который мгновенно заполняет мою память.

Мне потребовалось некоторое время, чтобы сузить проблему до минимального теста, но ниже приведен код.

Я могу реорганизовать вещи, чтобы я не столкнулся с этой проблемой, но, не понимая, почему именно это взрывается, я чувствую, что использование потоков - это просто бомба замедленного действия.

Я взял вдохновение от Оператор буфера на Java 8 потоков для буферизации.

import java.util.*;
import java.util.stream.LongStream;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

public class BreakStreams
{

   //@see https://stackoverflow.com/questions/47842871/buffer-operator-on-java-8-streams
   /**
    * Batch a stream into chunks
    */
   public static <T> Stream<List<T>> buffer(Stream<T> stream, final long count)
   {
      final Iterator<T> streamIterator = stream.iterator();

      return StreamSupport.stream(Spliterators.spliteratorUnknownSize(new Iterator<List<T>>()
      {
         @Override public boolean hasNext()
         {
            return streamIterator.hasNext();
         }

         @Override public List<T> next()
         {
            List<T> intermediate = new ArrayList<>();
            for (long v = 0; v < count && hasNext(); v++)
            {
               intermediate.add(streamIterator.next());
            }
            return intermediate;
         }
      }, 0), false);
   }

   public static void main(String[] args)
   {

      //create streams from huge datasets
      Stream<Long> streams = Stream.of(LongStream.range(0, Integer.MAX_VALUE).boxed(),
                                       LongStream.range(0, Integer.MAX_VALUE).boxed())
                                   //collapse into one stream
                                   .flatMap(x -> x);
      //iterating over the stream one item at a time is OK..
//      streams.forEach(x -> {

      //buffering the stream is NOT ok, you will go OOM
      buffer(streams, 25).forEach(x -> {
         try
         {
            Thread.sleep(2500);
         }
         catch (InterruptedException ignore)
         {
         }
         System.out.println(x);
      });
   }
}

1 Ответ

6 голосов
/ 09 апреля 2020

Похоже, это связано со старой проблемой « Почему filter () после flatMap ()« не полностью »ленив в Java потоках? ». Хотя эта проблема была исправлена ​​для встроенных операций Stream, она, кажется, все еще существует, когда мы пытаемся выполнить итерацию по потоку с плоским отображением извне.

Мы можем упростить код, чтобы воспроизвести проблему до

Stream.of(LongStream.range(0, Integer.MAX_VALUE))
    .flatMapToLong(x -> x)
    .iterator().hasNext();

Обратите внимание, что использование Spliterator также влияет на

Stream.of(LongStream.range(0, Integer.MAX_VALUE))
    .flatMapToLong(x -> x)
    .spliterator()
    .tryAdvance((long l) -> System.out.println("first item: "+l));

Оба пытаются буферизовать элементы до тех пор, пока в конечном итоге не выйдут с помощью OutOfMemoryError.

, поскольку spliterator().forEachRemaining(…), похоже, не В этом случае вы можете реализовать решение, которое будет работать для вашего варианта использования forEach, но это будет fr agile, поскольку оно по-прежнему будет создавать проблему для коротких замыканий потоковых операций.

public static <T> Stream<List<T>> buffer(Stream<T> stream, final int count) {
    boolean parallel = stream.isParallel();
    Spliterator<T> source = stream.spliterator();
    return StreamSupport.stream(
        new Spliterators.AbstractSpliterator<List<T>>(
            (source.estimateSize()+count-1)/count, source.characteristics()
                &(Spliterator.SIZED|Spliterator.DISTINCT|Spliterator.ORDERED)
                    | Spliterator.NONNULL) {
            List<T> list;
            Consumer<T> c = t -> list.add(t);
            @Override
            public boolean tryAdvance(Consumer<? super List<T>> action) {
                if(list == null) list = new ArrayList<>(count);
                if(!source.tryAdvance(c)) return false;
                do {} while(list.size() < count && source.tryAdvance(c));
                action.accept(list);
                list = null;
                return true;
            }
            @Override
            public void forEachRemaining(Consumer<? super List<T>> action) {
                source.forEachRemaining(t -> {
                    if(list == null) list = new ArrayList<>(count);
                    list.add(t);
                    if(list.size() == count) {
                        action.accept(list);
                        list = null;
                    }
                });
                if(list != null) {
                    action.accept(list);
                    list = null;
                }
            }
        }, parallel);
}

Но обратите внимание, что решения на основе Spliterator в целом предпочтительнее, так как они поддерживают передачу дополнительной информации, позволяющей оптимизировать работу, и во многих случаях используют более низкие затраты на итерацию. Так что это путь к go после исправления этой проблемы в коде JDK.

В качестве обходного пути вы можете использовать Stream.concat(…) для объединения потоков, но у него есть явное предупреждение о том, что не нужно объединять слишком много потоков одновременно в его документации :

Будьте осторожны при построении потоков из повторной конкатенации. Доступ к элементу глубоко конкатенированного потока может привести к глубоким цепочкам вызовов или даже к StackOverflowException [sic].

Имя throwable было исправлено до StackOverflowError в документации Java 9

...