Как оптимизировать агрегацию так, чтобы агрегация для каждого потребителя выполнялась первой? - PullRequest
0 голосов
/ 30 декабря 2018

У меня есть тема kafka с именем input с несколькими разделами.

Допустим, сообщение выглядит следующим образом:

{
    "key": 123456, 
    "otherKey": 444, 
    ... 
}

Записи разделены по «ключу» (и поэтомуодин и тот же ключ всегда будет обрабатываться одним и тем же потребителем Kafka.

Теперь я хотел бы посчитать количество событий для каждого «otherKey» в минуту.Насколько я понимаю, это легко сделать, используя KStreams, например:

input.groupBy((k, v) -> v.getOtherKey())
     .windowedBy(TimeWindows.of(Duration.of(60, SECONDS)))
     .count()
     .suppress(untilWindowCloses(Suppressed.BufferConfig.unbounded()))
     .toStream()
     .to("output");

С groupBy, Kafka Streams будет перераспределять данные во внутреннюю тему kafka, с 1 событием для каждого события вinput тема.

Мне это кажется пустой тратой.Он мог бы подсчитывать сообщения в каждом потребителе кафки (считая только для разделов потребителя) по «otherKey» и публиковать во внутренней теме только один раз в минуту для «otherKey».

Есть ли способ сделать этоиспользуя Kafka Streams?

1 Ответ

0 голосов
/ 30 декабря 2018

Ваши наблюдения о поведении верны, и ваша идея оптимизировать выполнение также верна.

Однако эта оптимизация в настоящее время не реализована.Причина в том, что suppress() - это совершенно новый оператор, и описанная вами оптимизация не имела смысла до введения suppress().

Если вы действительно хотите провести эту оптимизацию, вы можете построить ее с помощью ProcessorAPI, хотя.

...