Топология Kafka Stream на нескольких экземплярах - PullRequest
0 голосов
/ 11 сентября 2018

У нас есть топология потоков, которая будет работать на нескольких машинах. Мы храним результаты агрегации с временными окнами в государственных хранилищах. Так как хранилища состояния хранят локальные данные, я думаю, что агрегацию следует проводить по другой теме для общей агрегации. Но похоже, что я что-то упускаю, потому что ни один из примеров не выполняет общее агрегирование на другом KStream или Процессоре.

Нужно ли нам использовать логику groupBy для хранения общей агрегации, или использовать GlobalKtable, или просто где-нибудь реализовать наш собственный код слияния?

Какова правильная архитектура для этого?

В приведенном ниже коде я попытался сгруппировать все сообщения, поступающие на процессор, с постоянным ключом, чтобы сохранить общую агрегацию только на одной машине, но, по-моему, он потерял бы параллелизм, который предоставляет Kafka.

dashboardItemProcessor = streamsBuilder.stream("Topic25", Consumed.with(Serdes.String(), eventSerde))
        .filter((key, event) -> event != null && event.getClientCreationDate() != null);

 dashboardItemProcessor.map((key, event) -> KeyValue.pair(key, event.getClientCreationDate().toInstant().toEpochMilli()))
       .groupBy((key, event) -> "count", Serialized.with(Serdes.String(), Serdes.Long()))
       .windowedBy(timeWindow)
       .count(Materialized.as(dashboardItemUtil.getStoreName(itemId, timeWindow)));

1 Ответ

0 голосов
/ 12 сентября 2018

В приведенном ниже коде я попытался сгруппировать все сообщения, поступающие на процессор, с постоянным ключом для хранения общего агрегирования только на одной машине, но, как мне кажется, он потерял бы параллелизм, который предоставляет Kafka.

Кажется, это правильный подход.И да, вы теряете параллелизм, но так работает глобальная агрегация.В конце концов, одна машина должна его вычислить ...

Что вы могли бы улучшить, это сделать двухэтапный подход: т.е. сначала агрегировать по "случайным" ключам параллельно, и использовать второй шаг столько один ключ для «объединения» частичных агрегатов в один.Таким образом, некоторые части вычислений распараллеливаются, и только последний шаг (при сниженной нагрузке на данные) не является параллельным.Используя Kafka Streams, вам нужно реализовать этот подход «вручную».

...