Потоки Кафки возвращают все записи, где fieldx = некоторое значение - PullRequest
0 голосов
/ 03 июля 2019

У меня есть записи, поступающие в kafka с несколькими неуникальными полями, давайте назовем их Field1 ... Field n.

Я хочу написать запрос, чтобы вернуть все записи, где fieldx = некоторое значение.Давайте рассмотрим следующий простой пример.Представьте, что в систему поступают заказы, а одно из полей в заказе - customerId.Основной операцией будет получение всех заказов для конкретного клиента.Как мне это сделать с Kafka Streams?

У меня уже есть KTable и материализованное представление всех записей, так что я мог бы просто просмотреть все записи в представлении и выбрать те, которые я хочу,но кажется, что это было бы неэффективно и дорого.

Я действительно хотел бы создать материализованное представление, в котором представление содержит записи, сгруппированные по fieldx, но я не вижу никакого способа сделать это.Похоже, вы можете использовать groupby только для агрегации, подсчета, сокращения и т. Д.

Есть идеи, как это сделать?

Ответы [ 2 ]

1 голос
/ 03 июля 2019

Вот пример фильтрации заказов по идентификатору клиента.Для этого запроса нет необходимости создавать таблицу KTable для группировки или объединения.Тем не менее, поскольку темы Kafka являются журналами только для добавления без вторичных индексов, вам действительно нужно перебирать все сообщения, чтобы найти поток заказов, соответствующий вашему идентификатору клиента.

StreamsBuilder builder = new StreamsBuilder();
KStream<String, Order> orderStream = builder.stream("orders");
orderStream.filter((k,v) -> "customer-1".equals(v.customerId));

Обратите внимание, что в приведенном выше коде предполагается, что в вашем потоке заказов есть также ключи типа String, но эти ключи игнорируются.

Также обратите внимание, что вам нужно будет указать, как Kafka Streams должен десериализовать сообщения в ваш класс Order.Вы можете указать десериализаторы используя Consumed.with(...).

Полные примеры можно найти в репозитории примеров Kafka Streams на github: https://github.com/confluentinc/kafka-streams-examples

Не то чтобы запросы такого типа также можно было писать с использованием KSQL: https://www.confluent.io/stream-processing-cookbook/

0 голосов
/ 05 июля 2019

Вы должны сгруппировать ваш поток заказов по «customerID» и собрать все заказы в список.Результат KTable будет иметь <CustomerId, [List of Order]> типы событий.

Используя интерактивные запросы, вы можете запросить хранилище состояний,

StreamsBuilder builder = new StreamsBuilder();
KStream<String, Order> orderStream = builder.stream("orders");
KTable<String,ArrayList<Order>> orderTable = orderStream
      .groupBy((key,value)-> value .get("customerId"))
      .aggregate(()-> new ArrayList<Order>(),
                 (key,val,agg)-> agg.add(val),
                  Materialized.as("customer-orders")
                  .withValueSerde(ArrayListSerde())          
       ); 

. Он создаст материализованное представление "клиентские заказы", которое выможно запросить через конечную точку покоя.

Вы можете перейти по ссылке ниже для предоставления KTables в качестве конечной точки покоя:

https://docs.confluent.io/current/streams/developer-guide/interactive-queries.html

...