HazelcastJet кафка дросселирование - PullRequest
0 голосов
/ 19 февраля 2020

Я не смог найти никаких возможностей для создания конвейера в hazelcast-jet-kafka, который бы ограничивал пропускную способность указанным c количеством элементов в единицу времени, кто-нибудь мог бы предложить мне возможные решения? Я знаю, что у альпака (https://doc.akka.io/docs/alpakka-kafka/current/) есть такая функциональность

1 Ответ

1 голос
/ 19 февраля 2020

Вы можете определить эту функцию:

private <T, S extends GeneralStage<T>> FunctionEx<S, S> throttle(int itemsPerSecond) {
    // context for the mapUsingService stage
    class Service {
        final int ratePerSecond;
        final TreeMap<Long, Long> counts = new TreeMap<>();

        public Service(int ratePerSecond) {
            this.ratePerSecond = ratePerSecond;
        }
    }

    // factory for the service
    ServiceFactory<?, Service> serviceFactory = ServiceFactories
            .nonSharedService(procCtx ->
                    // divide the count for the actual number of processors we have
                    new Service(Math.max(1, itemsPerSecond / procCtx.totalParallelism())))
            // non-cooperative is needed because we sleep in the mapping function
            .toNonCooperative();

    return stage -> (S) stage
        .mapUsingService(serviceFactory,
            (ctx, item) -> {
                // current time in 10ths of a second
                long now = System.nanoTime() / 100_000_000;
                // include this item in the counts
                ctx.counts.merge(now, 1L, Long::sum);
                // clear items emitted more than second ago
                ctx.counts.headMap(now - 10, true).clear();
                long countInLastSecond =
                        ctx.counts.values().stream().mapToLong(Long::longValue).sum();
                // if we emitted too many items, sleep a while
                if (countInLastSecond > ctx.ratePerSecond) {
                    Thread.sleep(
                        (countInLastSecond - ctx.ratePerSecond) * 1000/ctx.ratePerSecond);
                }
                // now we can pass the item on
                return item;
            }
        );
}

Затем использовать ее для регулирования в конвейере:

Pipeline p = Pipeline.create();
p.readFrom(TestSources.items(IntStream.range(0, 2_000).boxed().toArray(Integer[]::new)))
 .apply(throttle(100))
 .writeTo(Sinks.noop());

Выполнение вышеуказанного задания займет около 20 секунд, так как оно имеет 2000 предметов и скорость ограничена до 100 предметов / с. Скорость оценивается за последнюю секунду, поэтому, если количество предметов меньше 100, предметы будут отправлены немедленно. Если в течение одной миллисекунды будет 101 элемент, 100 будут перенаправлены немедленно, а следующий - после сна.

Также убедитесь, что ваш источник распространен. Скорость делится на количество процессоров в кластере, и если ваш источник не распределен, а некоторые участники не видят никаких данных, ваша общая скорость будет только частью желаемой скорости.

...