Как уже отмечали другие: здесь есть некоторые оговорки.Прежде всего, потоки не должны использоваться для чего-то подобного.
На техническом уровне можно еще поспорить:
- Поток может быть бесконечным
- Даже если вы знаете количество элементов: это число может бытьискажается операциями, такими как
filter
или flatMap
- Для параллельного потока отслеживание хода выполнения приведет к принудительной установке точки синхронизации
- Если имеется терминал Операция , которая стоит дорого (например, агрегация в вашем случае), тогда сообщаемый прогресс может даже не заметно отражать время вычисления
ОднакоПомня об этом, один подход, который может быть разумным для вашего приложения, заключается в следующем:
Вы можете создать Function<T,T>
, который передается в map
потока.(По крайней мере, я предпочел бы это вместо использования peek
в потоке, как предложено в другом ответе).Эта функция может отслеживать прогресс, используя AtomicLong
для подсчета элементов.Чтобы отделить отдельные элементы, этот прогресс может быть затем просто передан в Consumer<Long>
, который будет заботиться о презентации
«Представление» здесь относится к печати этого прогресса на консоли, нормализовано илив процентах, ссылаясь на размер, который может быть известен везде, где создается потребитель.Но потребитель может также позаботиться только о печати, например, каждого 10-го элемента, или распечатать сообщение, только если прошло не менее 5 секунд с момента предыдущего.
import java.util.Iterator;
import java.util.Locale;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.function.LongConsumer;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
public class StreamProgress
{
public static void main(String[] args)
{
int size = 250;
Stream<Integer> stream = readData(size);
LongConsumer progressConsumer = progress ->
{
// "Filter" the output here: Report only every 10th element
if (progress % 10 == 0)
{
double relative = (double) progress / (size - 1);
double percent = relative * 100;
System.out.printf(Locale.ENGLISH,
"Progress %8d, relative %2.5f, percent %3.2f\n",
progress, relative, percent);
}
};
Integer result = stream
.map(element -> process(element))
.map(progressMapper(progressConsumer))
.reduce(0, (a, b) -> a + b);
System.out.println("result " + result);
}
private static <T> Function<T, T> progressMapper(
LongConsumer progressConsumer)
{
AtomicLong counter = new AtomicLong(0);
return t ->
{
long n = counter.getAndIncrement();
progressConsumer.accept(n);
return t;
};
}
private static Integer process(Integer element)
{
return element * 2;
}
private static Stream<Integer> readData(int size)
{
Iterator<Integer> iterator = new Iterator<Integer>()
{
int n = 0;
@Override
public Integer next()
{
try
{
Thread.sleep(10);
}
catch (InterruptedException e)
{
e.printStackTrace();
}
return n++;
}
@Override
public boolean hasNext()
{
return n < size;
}
};
return StreamSupport.stream(
Spliterators.spliteratorUnknownSize(
iterator, Spliterator.ORDERED), false);
}
}