Получить ключ записи с внутренним объединением в Kafka Streams DSL - PullRequest
0 голосов
/ 16 мая 2019

Есть ли способ передать или получить доступ к ключу сообщения из раздела join в соединении Kafka Stream DSL?

У меня сейчас что-то вроде этого:

    KStream<String, GenericRecord> completedEventsStream = inputStartKStream.
        join(
            inputEndKStream,
            (leftValue, rightValue) -> customLambda((Record) leftValue, (Record) rightValue),
            JoinWindows.of(windowDuration),
            Joined.with(stringSerde, genericAvroSerde, genericAvroSerde)
        );

Однако записи leftValue и rightValue, переданные в customLambda, не имеют доступа к ключу сообщения kafka, потому что это отдельная строка. Единственный контент, который у них есть, - это само сообщение, а не ключ.

Есть ли способ получить доступ к ключу внутри лямбда-соединения? Одна вещь, которую я мог бы сделать, это просто добавить ключ сообщения как часть самого сообщения и получить к нему доступ как к обычному полю, но мне было интересно, если инфраструктура предоставляет способ прямого доступа к нему?

1 Ответ

2 голосов
/ 16 мая 2019

В большинстве случаев ключ также доступен в значении записи, разве это не так для вашего приложения?

Похоже, что интерфейс ValueJoiner имеет улучшение , поданное какчасть KIP-149 , но не была завершена, как другие методы в этом KIP: ValueTransformer и ValueMapper.

Вы можете добавить шаг перед соединением, чтобы извлечь ключ ивключите его в значение вашего сообщения перед выполнением объединения, используя ValueMapperWithKey.

...