Агрегировать в сжатую тему с неограниченным сроком хранения - PullRequest
0 голосов
/ 02 ноября 2019

Я настраиваю приложение Kafka Streams, которое использует тему (срок хранения: 14 дней, cleanup.policy: удалить, разделы: 1). Я хочу использовать сообщения и вывести их в другую тему (удержание: -1, cleanup.policy: compact, partitions: 3).

Группировка осуществляется по ключу в теме ввода. Итак: Input-topic:

Key: A   Value: { SomeJson }
Key: A   Value: { Other Json}
Key: B   Value: { TestJson }

Output:

Key: A   Value: {[ { SomeJson }, { Other Json } ]}
Key: B   Value: {[ { TestJson } ]}

Важно, чтобы содержимое в выходной теме никогда не терялось, поэтому все равно: все и 3x реплики. Каждый ключ в уплотненной теме будет содержать около 100 записей JSON. Приблизительно менее 20 КБ для каждого ключа.

Я также надеялся, что тема вывода работала как тема состояния, так что ей не пришлось бы создавать другую тему, содержащую ту же информацию.

Кто-нибудь знает, как это сделать? Большинство примеров, которые я нахожу, относятся к работе с окнами: https://github.com/confluentinc/kafka-streams-examples/tree/5.3.1-post/src/main/java/io/confluent/examples/streams

Текущий код:

val mapper = new ObjectMapper();                                                                             

builder.stream(properties.getInputTopic(), Consumed.with(Serdes.String(), Serdes.String()))                   
        .groupByKey()                                                                                         
        .aggregate(                                                                                          
                () -> new GroupedIdenthendelser(Collections.emptyList()),                                    
                (key, value, currentAggregate) -> {                                                          
                    val items = new ArrayList<>(currentAggregate.getIdenthendelser());                       
                    items.add(value);                                                                        

                    return new GroupedIdenthendelser(items);                                                 
                },                                                                                           
                Materialized.with(Serdes.String(), new JsonSerde<>(GroupedIdenthendelser.class, mapper)))     
        .toStream()                                                                                           
        .to(properties.getOutputTopic(), Produced.with(Serdes.String(), new JsonSerde<>(mapper)));           

Если у кого-то есть какие-то другие советы, пожалуйста, сообщите, поскольку эти данные являются информацией о клиенте, поэтомуесли есть какой-то конфиг, я должен настроить, сказать. Или, если вы знаете, что некоторые посты / примеры в блоге приветствуются.

Edit: Пример кода выше, кажется, работает, но он создает свою собственную тему состояния, которая не нужна, так кактема вывода всегда будет содержать одно и то же состояние. Также будет только 1 приложение, работающее с этим, поскольку входная тема имеет 1 раздел, и поскольку он относится к людям довольно фиксированного размера (10 000 000 человек дают или принимают), размер данных не будет превышать 20 КБ на человека. или. Событие в секунду оценивается примерно в 1 / с, поэтому нагрузка на него также невелика.

Топология:

Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic])
      --> KSTREAM-AGGREGATE-0000000002
    Processor: KSTREAM-AGGREGATE-0000000002 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000001])
      --> KTABLE-TOSTREAM-0000000003
      <-- KSTREAM-SOURCE-0000000000
    Processor: KTABLE-TOSTREAM-0000000003 (stores: [])
      --> KSTREAM-SINK-0000000004
      <-- KSTREAM-AGGREGATE-0000000002
    Sink: KSTREAM-SINK-0000000004 (topic: output-topic)
      <-- KTABLE-TOSTREAM-0000000003

1 Ответ

0 голосов
/ 02 ноября 2019

Глядя на ваш пример набора данных, я думаю, что вам может понадобиться агрегация в реальном времени. Пожалуйста, обратите внимание на это сообщение в блоге Confluent в качестве отправной точки.

...