Вы можете определить эту функцию:
private <T, S extends GeneralStage<T>> FunctionEx<S, S> throttle(int itemsPerSecond) {
// context for the mapUsingService stage
class Service {
final int ratePerSecond;
final TreeMap<Long, Long> counts = new TreeMap<>();
public Service(int ratePerSecond) {
this.ratePerSecond = ratePerSecond;
}
}
// factory for the service
ServiceFactory<?, Service> serviceFactory = ServiceFactories
.nonSharedService(procCtx ->
// divide the count for the actual number of processors we have
new Service(Math.max(1, itemsPerSecond / procCtx.totalParallelism())))
// non-cooperative is needed because we sleep in the mapping function
.toNonCooperative();
return stage -> (S) stage
.mapUsingService(serviceFactory,
(ctx, item) -> {
// current time in 10ths of a second
long now = System.nanoTime() / 100_000_000;
// include this item in the counts
ctx.counts.merge(now, 1L, Long::sum);
// clear items emitted more than second ago
ctx.counts.headMap(now - 10, true).clear();
long countInLastSecond =
ctx.counts.values().stream().mapToLong(Long::longValue).sum();
// if we emitted too many items, sleep a while
if (countInLastSecond > ctx.ratePerSecond) {
Thread.sleep(
(countInLastSecond - ctx.ratePerSecond) * 1000/ctx.ratePerSecond);
}
// now we can pass the item on
return item;
}
);
}
Затем использовать ее для регулирования в конвейере:
Pipeline p = Pipeline.create();
p.readFrom(TestSources.items(IntStream.range(0, 2_000).boxed().toArray(Integer[]::new)))
.apply(throttle(100))
.writeTo(Sinks.noop());
Выполнение вышеуказанного задания займет около 20 секунд, так как оно имеет 2000 предметов и скорость ограничена до 100 предметов / с. Скорость оценивается за последнюю секунду, поэтому, если количество предметов меньше 100, предметы будут отправлены немедленно. Если в течение одной миллисекунды будет 101 элемент, 100 будут перенаправлены немедленно, а следующий - после сна.
Также убедитесь, что ваш источник распространен. Скорость делится на количество процессоров в кластере, и если ваш источник не распределен, а некоторые участники не видят никаких данных, ваша общая скорость будет только частью желаемой скорости.