Я настраиваю приложение Kafka Streams, которое использует тему (срок хранения: 14 дней, cleanup.policy: удалить, разделы: 1). Я хочу использовать сообщения и вывести их в другую тему (удержание: -1, cleanup.policy: compact, partitions: 3).
Группировка осуществляется по ключу в теме ввода. Итак: Input-topic:
Key: A Value: { SomeJson }
Key: A Value: { Other Json}
Key: B Value: { TestJson }
Output:
Key: A Value: {[ { SomeJson }, { Other Json } ]}
Key: B Value: {[ { TestJson } ]}
Важно, чтобы содержимое в выходной теме никогда не терялось, поэтому все равно: все и 3x реплики. Каждый ключ в уплотненной теме будет содержать около 100 записей JSON. Приблизительно менее 20 КБ для каждого ключа.
Я также надеялся, что тема вывода работала как тема состояния, так что ей не пришлось бы создавать другую тему, содержащую ту же информацию.
Кто-нибудь знает, как это сделать? Большинство примеров, которые я нахожу, относятся к работе с окнами: https://github.com/confluentinc/kafka-streams-examples/tree/5.3.1-post/src/main/java/io/confluent/examples/streams
Текущий код:
val mapper = new ObjectMapper();
builder.stream(properties.getInputTopic(), Consumed.with(Serdes.String(), Serdes.String()))
.groupByKey()
.aggregate(
() -> new GroupedIdenthendelser(Collections.emptyList()),
(key, value, currentAggregate) -> {
val items = new ArrayList<>(currentAggregate.getIdenthendelser());
items.add(value);
return new GroupedIdenthendelser(items);
},
Materialized.with(Serdes.String(), new JsonSerde<>(GroupedIdenthendelser.class, mapper)))
.toStream()
.to(properties.getOutputTopic(), Produced.with(Serdes.String(), new JsonSerde<>(mapper)));
Если у кого-то есть какие-то другие советы, пожалуйста, сообщите, поскольку эти данные являются информацией о клиенте, поэтомуесли есть какой-то конфиг, я должен настроить, сказать. Или, если вы знаете, что некоторые посты / примеры в блоге приветствуются.
Edit: Пример кода выше, кажется, работает, но он создает свою собственную тему состояния, которая не нужна, так кактема вывода всегда будет содержать одно и то же состояние. Также будет только 1 приложение, работающее с этим, поскольку входная тема имеет 1 раздел, и поскольку он относится к людям довольно фиксированного размера (10 000 000 человек дают или принимают), размер данных не будет превышать 20 КБ на человека. или. Событие в секунду оценивается примерно в 1 / с, поэтому нагрузка на него также невелика.
Топология:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic])
--> KSTREAM-AGGREGATE-0000000002
Processor: KSTREAM-AGGREGATE-0000000002 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000001])
--> KTABLE-TOSTREAM-0000000003
<-- KSTREAM-SOURCE-0000000000
Processor: KTABLE-TOSTREAM-0000000003 (stores: [])
--> KSTREAM-SINK-0000000004
<-- KSTREAM-AGGREGATE-0000000002
Sink: KSTREAM-SINK-0000000004 (topic: output-topic)
<-- KTABLE-TOSTREAM-0000000003