Группировка Kafka Stream по полю значений KTable - PullRequest
1 голос
/ 16 мая 2019

У меня есть сценарий использования, где мой KTable выглядит следующим образом.

KTable : orderTable

Ключ: Значение

{123} : {id1,12}

{124} : {id2,10}

{125} : {id1,5}

{126} : {id2,11}

KTable : orderByIdTable => Эта таблица будет в значении groupBy field (id), а значение столбца счета будет иметь сумму id1=(12+5), id2=(10+11)

Ключ: значение

{id1} : {17}

{id2} : {21}

         final KTable<String, Order> orderTable = builder.table("order-topic");
         Don't know how to do this further.....
         final KTable<String,Long> orderByIdTable = ?

1 Ответ

2 голосов
/ 17 мая 2019

Вот пример кода (с использованием только примитивных типов Java, который позволил мне собрать их быстрее), который демонстрирует, как перекодировать или переразбить KTable, что приводит к созданию нового KTable.Вы сможете легко адаптировать его к своему примеру превращения KTable<String, Order> в KTable<String, Long>.

. Лично я бы выбрал вариант 2 для вашего варианта использования.

Примеры ниже, Не полностью протестировано , возможно, записи захоронения (сообщения с ненулевым ключом, но с нулевым значением, которые указывают, что ключ должен быть удален из таблицы) не обрабатываются должным образом.

final StreamsBuilder builder = new StreamsBuilder();
final KTable<Integer, String> table = builder.table(inputTopic, Consumed.with(Serdes.Integer(), Serdes.String()));

// Variant 1 (https://docs.confluent.io/current/streams/faq.html#option-1-write-kstream-to-ak-read-back-as-ktable)
// Here, we re-key the KTable, write the results to a new topic, and then re-read that topic into a new KTable.
table
    .toStream()
    .map((key, value) -> KeyValue.pair(value, key))
    .to(outputTopic1, Produced.with(Serdes.String(), Serdes.Integer()));
KTable<String, Integer> rekeyedTable1 =
    builder.table(outputTopic1, Consumed.with(Serdes.String(), Serdes.Integer()));

// Variant 2 (https://docs.confluent.io/current/streams/faq.html#option-2-perform-a-dummy-aggregation)
// Here, we re-key the KTable (resulting in a KGroupedTable), and then perform a dummy aggregation to turn the
// KGroupedTable into a KTable.
final KTable<String, Integer> rekeyedTable2 =
    table
        .groupBy(
            (key, value) -> KeyValue.pair(value, key),
            Grouped.with(Serdes.String(), Serdes.Integer())
        )
        // Dummy aggregation
        .reduce(
            (aggValue, newValue) -> newValue, /* adder */
            (aggValue, oldValue) -> oldValue  /* subtractor */
        );
rekeyedTable2.toStream().to(outputTopic2, Produced.with(Serdes.String(), Serdes.Integer()));
...