Показать прогресс обработки потока Java 8 - PullRequest
0 голосов
/ 10 июня 2018

У меня есть Stream, обрабатывающий несколько миллионов элементов.Алгоритм Map-Reduce, стоящий за ним, занимает несколько миллисекунд, поэтому выполнение задачи занимает около двадцати минут.

Stream<MyData> myStream = readData();
MyResult result = myStream
    .map(row -> process(row))
    .peek(stat -> System.out.println("Hi, I processed another item"))
    .reduce(MyStat::aggregate);

Я бы хотел отобразить общий прогресс, а не печатать строку на элемент (что приводит кв тысячах строк в секунду, занимает время и не дает никакой полезной информации об общем прогрессе).Я хотел бы показать что-то похожее на:

 5% (08s)
10% (14s)
15% (20s)
...

Каков наилучший (и / или самый простой) способ сделать это?

Ответы [ 3 ]

0 голосов
/ 10 июня 2018

Прежде всего, потоки не предназначены для решения подобных задач (в отличие от классической структуры данных).Если вы уже знаете, сколько элементов будет обрабатывать ваш поток, вы можете воспользоваться следующей опцией, которая, повторяю, не является целью потоков.

Stream<MyData> myStream = readData();
final AtomicInteger loader = new AtomicInteger();
int fivePercent = elementsCount / 20;
MyResult result = myStream
    .map(row -> process(row))
    .peek(stat -> {
        if (loader.incrementAndGet() % fivePercent == 0) {
            System.out.println(loader.get() + " elements on " + elementsCount + " treated");
            System.out.println((5*(loader.get() / fivePercent)) + "%");
        }
    })
    .reduce(MyStat::aggregate);
0 голосов
/ 10 июня 2018

Как уже отмечали другие: здесь есть некоторые оговорки.Прежде всего, потоки не должны использоваться для чего-то подобного.

На техническом уровне можно еще поспорить:

  • Поток может быть бесконечным
  • Даже если вы знаете количество элементов: это число может бытьискажается операциями, такими как filter или flatMap
  • Для параллельного потока отслеживание хода выполнения приведет к принудительной установке точки синхронизации
  • Если имеется терминал Операция , которая стоит дорого (например, агрегация в вашем случае), тогда сообщаемый прогресс может даже не заметно отражать время вычисления

ОднакоПомня об этом, один подход, который может быть разумным для вашего приложения, заключается в следующем:

Вы можете создать Function<T,T>, который передается в map потока.(По крайней мере, я предпочел бы это вместо использования peek в потоке, как предложено в другом ответе).Эта функция может отслеживать прогресс, используя AtomicLong для подсчета элементов.Чтобы отделить отдельные элементы, этот прогресс может быть затем просто передан в Consumer<Long>, который будет заботиться о презентации

«Представление» здесь относится к печати этого прогресса на консоли, нормализовано илив процентах, ссылаясь на размер, который может быть известен везде, где создается потребитель.Но потребитель может также позаботиться только о печати, например, каждого 10-го элемента, или распечатать сообщение, только если прошло не менее 5 секунд с момента предыдущего.

import java.util.Iterator;
import java.util.Locale;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.function.LongConsumer;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

public class StreamProgress
{
    public static void main(String[] args)
    {
        int size = 250;
        Stream<Integer> stream = readData(size);

        LongConsumer progressConsumer = progress -> 
        {
            // "Filter" the output here: Report only every 10th element
            if (progress % 10 == 0)
            {
                double relative = (double) progress / (size - 1);
                double percent = relative * 100;
                System.out.printf(Locale.ENGLISH,
                    "Progress %8d, relative %2.5f, percent %3.2f\n",
                    progress, relative, percent);
            }
        };

        Integer result = stream
            .map(element -> process(element))
            .map(progressMapper(progressConsumer))
            .reduce(0, (a, b) -> a + b);

        System.out.println("result " + result);
    }

    private static <T> Function<T, T> progressMapper(
        LongConsumer progressConsumer)
    {
        AtomicLong counter = new AtomicLong(0);
        return t -> 
        {
            long n = counter.getAndIncrement();
            progressConsumer.accept(n);
            return t;
        };

    }

    private static Integer process(Integer element)
    {
        return element * 2;
    }

    private static Stream<Integer> readData(int size)
    {
        Iterator<Integer> iterator = new Iterator<Integer>()
        {
            int n = 0;
            @Override
            public Integer next()
            {
                try
                {
                    Thread.sleep(10);
                }
                catch (InterruptedException e)
                {
                    e.printStackTrace();
                }
                return n++;
            }

            @Override
            public boolean hasNext()
            {
                return n < size;
            }
        };
        return StreamSupport.stream(
            Spliterators.spliteratorUnknownSize(
                iterator, Spliterator.ORDERED), false);
    }
}
0 голосов
/ 10 июня 2018

Возможность сделать это сильно зависит от типа source, который вы имеете в stream.Если у вас есть коллекция, и вы хотите применить к ней некоторые операции, вы можете это сделать, потому что вы знаете, каков размер коллекции, и можете вести подсчет обработанных элементов.Но в этом случае есть предостережение.Если вы будете выполнять параллельные вычисления в потоке, это также станет более трудным.

В тех случаях, когда вы передаете данные из-за пределов приложения, очень трудно, чтобы вы могли смоделировать прогресс, как вы этого не делаете.не знаю, когда закончится поток.

...