Потоки Кафки объединяют несвязанные потоки - PullRequest
3 голосов
/ 04 февраля 2020

У меня есть поток событий, который мне нужно сопоставить с топологией ktable / changelog c, но сопоставление выполняется путем сопоставления с образцом для свойства записей ktable. поэтому я не могу присоединиться к потокам на основе ключа, поскольку я пока не знаю, какой из них соответствует.

пример:

ktable X:

{
  [abc]: {id: 'abc', prop: 'some pattern'},
  [efg]: {id: 'efg', prop: 'another pattern'}
}

поток A:

{ id: 'xyz', match: 'some pattern'}

поэтому stream A должен переслать что-то вроде {match: 'abc'}

Так что мне в основном нужно перебрать записи ktable и найти подходящую запись по сопоставлению с образцом на этом свойство.

Было бы целесообразно создать глобальное хранилище состояний на основе ktable, а затем получить к нему доступ из API процессора и перебрать записи?

Я также мог бы объединить все записи ktable в 1 коллекцию, а затем присоединиться к «поддельному» ключу? Но это также кажется довольно хакерским.

Или я просто заставляю что-то, что на самом деле не является потоковым, и просто помещаю это в кэш redis с обычным потребительским API, что также довольно неудобно, так как я предпочитаю его поддерживать. by rockDB.

edit: я думаю, это как-то связано с этим вопросом

Ответы [ 2 ]

0 голосов
/ 10 февраля 2020

Обычно я сопоставляю данные таблицы, чтобы получить нужный мне ключ соединения. Недавно у нас был похожий случай, когда мы должны были объединить поток с соответствующими данными в KTable. В нашем случае ключ потока был первой частью ключа таблицы, поэтому мы могли группировать по этой первой части ключа и объединять результаты в виде списка. В конце это выглядело примерно так.

final KTable<String, ArrayList<String>> theTable = builder
        .table(TABLE_TOPIC, Consumed.with(keySerde, Serdes.String()))
        .groupBy((k, v) -> new KeyValue<>(k.getFirstKeyPart(), v))
        .aggregate(
                ArrayList::new,
                (key, value, list) -> {
                    list.add(value);
                    return list;
                },
                (key, value, list) -> {
                    list.remove(value);
                    return list;
                },
                Materialized.with(Serdes.String(), stringListSerde));

final KStream<String, String> theStream = builder.stream(STREAM_TOPIC);

theStream
        .join(theTable, (streamEvent, tableEventList) -> tableEventList)
        .flatMapValues(value -> value)
        .map(this::doStuff)
        .to(TARGET_TOPIC);

Я не уверен, возможно ли это для вас, то есть, возможно, вы можете каким-то образом сопоставить данные таблицы с соединением.

I Я знаю, что это не совсем относится к вашему делу, но я надеюсь, что это все равно поможет. Может быть, вы можете немного уточнить, как будет выглядеть соответствие для вашего случая.

0 голосов
/ 10 февраля 2020

A GlobalKTable не будет работать, потому что объединение stream-globalTable позволяет извлекать неключевой атрибут соединения из потока - но поиск в таблице по-прежнему основан на ключе таблицы.

Однако вы можете прочитать входную таблицу topi c как KStream, извлечь атрибут соединения, установить его в качестве ключа и выполнить агрегацию, которая возвращает коллекцию (ie, List, Set, et c). Таким образом, вы можете выполнить объединение потоковой таблицы на ключе, за которым следует flatMapValues() (или flatMap()), который разбивает результат объединения на несколько записей (в зависимости от того, сколько записей в коллекции таблицы) .

Пока ваш атрибут объединения имеет не слишком много дубликатов (для входных данных таблицы topi c), и, таким образом, коллекция значений в таблице не становится слишком большой, это должно работать нормально. Вам нужно будет предоставить пользовательское значение-Serde для (де) сериализации данных сбора.

...