Добавление глобального магазина для трансформатора для потребления - PullRequest
0 голосов
/ 03 марта 2019

Есть ли способ добавить глобальный магазин для использования Трансформером?В документах для трансформатора написано:

"Преобразовать каждую запись входного потока в ноль или более записей в выходном потоке (ключ и тип значения могут быть изменены произвольно). Трансформатор (предоставляется даннымTransformerSupplier) применяется к каждой входной записи и вычисляет ноль или более выходных записей. Чтобы назначить состояние, состояние должно быть создано и зарегистрировано заранее через хранилища, добавленные через addStateStore или addGlobalStore, прежде чем они могут быть подключены к Transformer "

пока API для addGlobalStore on принимает ProcessSupplier?

addGlobalStore (storeBuilder: StoreBuilder [_ <: StateStore], topic: String, потребляется: Consumed [_, <em>], stateUpdateSupplier: ProcessorSupplier)[, _])

Моя конечная цель - использовать DSL Kafka Streams с преобразователем, поскольку мне требуется flatMap и преобразовывать ключи и значения в мою тему вывода.У меня нет топологии в моей топологии, хотя.

Я ожидал бы что-то вроде этого:

addGlobalStore (storeBuilder: StoreBuilder [_ <: StateStore], тема: String, потребляется: потребляется [_,], stateUpdateSupplier: TransformerSupplier [, _]) </p>

Ответы [ 2 ]

0 голосов
/ 16 мая 2019

Используйте Processor вместо Transformer для всех преобразований, которые вы хотите выполнить в теме ввода, всякий раз, когда есть сценарий поиска данных из GlobalStateStore.Используйте context.forward(key,value,childName) для отправки данных в нисходящие узлы.context.forward(key,value,childName) может вызываться несколько раз в process() и punctuate(), чтобы отправлять несколько записей в нисходящий узел.Если требуется обновить GlobalStateStore, делайте это только в Processor , передаваемом в addGlobalStore(..), поскольку существует GlobalStreamThread, связанный с GlobalStateStore, который сохраняет состояние хранилища согласованным для всех работающих экземпляров kstream.

0 голосов
/ 03 марта 2019

Processor, который передается в addGlobalStore(), используется для сохранения (т.е. записи) хранилища.Обратите внимание, что ожидается, что Processor копирует данные как есть в хранилище (см. https://issues.apache.org/jira/browse/KAFKA-7663).

. После добавления глобального хранилища вы также можете добавить Transformer и Transformer может получить доступ к хранилищу. Обратите внимание, что не требуется подключать глобальное хранилище, чтобы сделать его доступным (необходимо добавить только «обычные» хранилища). Также обратите внимание, что Transformer получает доступ только для чтения к глобальномумагазины.

...