Потоки Кафки создают простой материализованный вид - PullRequest
0 голосов
/ 02 июля 2019

У меня есть события, приходящие в Кафку с кучей неуникальных полей String и отметкой времени события. Я хочу создать материализованное представление об этих событиях, чтобы я мог запросить их. Например:

  1. Показать все события
  2. Показать все события, где field1 = some string
  3. Показать все события, которые соответствуют нескольким полям
  4. Показать события между 2 датами

Все примеры, которые я видел, имеют агрегацию, объединение или некоторую другую преобразовательную операцию в потоке. Я не могу найти ни одного простого примера создания представления для ряда событий. Я не хочу выполнять какие-либо операции, я просто хочу иметь возможность запрашивать исходные события, поступающие в поток.

Я использую Spring Kafka, поэтому пример с Spring Kafka будет идеальным.

Я могу получать сообщения в Кафку и использовать их. Однако я не смог создать материализованное представление.

У меня есть следующий код, который фильтрует события (не совсем то, что я хотел, я хочу все события, но я просто хотел посмотреть, смогу ли я получить материализованное представление):

@StreamListener
    public void process(@Input("input") KTable<String,MyMessage> myMessages) {
        keyValueStore = interactiveQueryService.getQueryableStore(ALL_MESSAGES,QueryableStoreTypes.keyValueStore());

        myMessages.filter((key,value) -> (value.getKey() != null));
                Materialized.<String,MyMessage,KeyValueStore<Bytes,byte[]>> as(ALL_MESSAGES)
                .withKeySerde(Serdes.String())
                .withValueSerde(new MyMessageSerde());

Это исключение:

java.lang.ClassCastException: [B cannot be cast to MyMessage
at org.apache.kafka.streams.kstream.internals.KTableFilter.computeValue(KTableFilter.java:57)
    at org.apache.kafka.streams.kstream.internals.KTableFilter.access$300(KTableFilter.java:25)
    at org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:79)
    at org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:63)
    at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:50)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
    at org.apache.kafka.streams.kstream.internals.ForwardingCacheFlushListener.apply(ForwardingCacheFlushListener.java:42)
    at org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:101)
    at org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$000(CachingKeyValueStore.java:38)
    at org.apache.kafka.streams.state.internals.CachingKeyValueStore$1.apply(CachingKeyValueStore.java:83)
    at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:141)
    at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:99)
    at org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:125)
    at org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:123)
    at org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.flush(InnerMeteredKeyValueStore.java:284)
    at org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.flush(MeteredKeyValueBytesStore.java:149)
    at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:239)
    ... 21 more

Я не понимаю почему, потому что я установил valueSerde хранилища на MyMessageSerde, который знает, как сериализовать / десериализовать MyMessage в байтовый массив.

Обновление

Я изменил код на следующий:

myMessages.filter((key,value) -> (value.getKey() != null));

и добавил следующее в мой application.yml

spring.cloud.stream.kafka.streams.bindings.input:
  consumer:
    materializedAs: all-messages
    key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    value-deserializer: MyMessageDeserializer  `

Теперь я получаю следующую трассировку стека:

Exception in thread "raven-a43f181b-ccb6-4d9b-a8fd-9fe96542c210-StreamThread-1" org.apache.kafka.streams.errors.ProcessorStateException: task [0_3] Failed to flush state store all-messages
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:242)
at org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:202)
at org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:420)
at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:394)
at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:382)
at org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:67)
at org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:362)
at org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:352)
at org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:401)
at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1042)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:845)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736)
Caused by: java.lang.ClassCastException: [B cannot be cast to MyMessage
at org.apache.kafka.streams.kstream.internals.KTableFilter.computeValue(KTableFilter.java:57)
at org.apache.kafka.streams.kstream.internals.KTableFilter.access$300(KTableFilter.java:25)
at org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:79)
at org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:63)
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:50)
at org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
at org.apache.kafka.streams.kstream.internals.ForwardingCacheFlushListener.apply(ForwardingCacheFlushListener.java:42)
at org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:101)
at org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$000(CachingKeyValueStore.java:38)
at org.apache.kafka.streams.state.internals.CachingKeyValueStore$1.apply(CachingKeyValueStore.java:83)
at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:141)
at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:99)
at org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:125)
at org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:123)
at org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.flush(InnerMeteredKeyValueStore.java:284)
at org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.flush(MeteredKeyValueBytesStore.java:149)
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:239)
... 12 more`

Ответы [ 2 ]

0 голосов
/ 06 июля 2019

Тип запросов, которые вы хотите, не поддерживается легко.Обратите внимание, что не существует вторичных индексов, а поддерживаются только регулярные поиски и диапазоны на основе ключей.

Если вы знаете все запросы заранее, вы можете перегруппировать данные в производные KTables, которыеиметь атрибут запроса в качестве ключа.Обратите внимание, что ключи должны быть уникальными, и, следовательно, если атрибут запроса содержит неуникальные данные, вам нужно будет использовать в качестве значения некоторый тип Collection:

KTable originalTable = builder.table(...)
KTable keyedByFieldATable = originalTable.groupBy(/*select field A*/).aggregate(/* the aggregation return a list or similar of entries for the key*/);

Обратите внимание, что вы дублируете требования к хранилищукаждый раз, когда вы повторно вводите исходную таблицу.

В качестве альтернативы вы можете выполнить полное сканирование таблицы по исходной таблице и оценить условия фильтрации при использовании возвращенного итератора.

Этопространство против ЦП.Возможно, Kafka Streams не подходит для вашей проблемы.

0 голосов
/ 03 июля 2019

Мне удалось создать материализованное представление следующим образом:

Конфигурация в application.yml

spring.cloud.stream.kafka.streams.bindings.input:
  consumer:
    materializedAs: all-messages
    keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
    valueSerde: com.me.MyMessageSerde
  producer:
    keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
    valueSerde: com.me.MyMessageSerde`  

Это устанавливает правильные сериализаторы и материализованное представление.

Следующий код создает таблицу KTable, которая материализует представление с использованием вышеуказанной конфигурации.

public void process(@Input("input") KTable<String,MyMessage> myMessages) {
}
...