Как я могу получить доступ к оригинальным записям при присоединении к Kafka Streams - PullRequest
1 голос
/ 16 апреля 2019

У меня есть работающее приложение Kafka Streams, которое в настоящее время создает два KStreams из двух разных тем.Эта часть работает просто отлично.

Теперь я хочу присоединиться к ним и получить «агрегированную запись» значения в первом и значения во втором.Ключи - это простые строки Java, а значения - это GenericRecords в аврокодировке.

Основываясь на документации, я смогу сделать что-то вроде этого:

    KStream<String, GenericAvroSerde> joined =
        inputTopicStartKStream.leftJoin(inputTopicEndKStream,
        (left, right) -> { ??? }
        JoinWindows.of(Duration.ofHours(24)),
        Joined.with(
            stringSerde,
            genericAvroSerde,
            genericAvroSerde)
    );

Однако это не такясно из документов или учебных пособий, которые я нашел в Интернете, что я могу сделать в разделе выше, где написано { ??? }.Я пробовал несколько вариантов выше, но не повезло.Я использую версию Kakfa Streams 2.2.0, если это имеет значение.

Я просто хочу иметь выходной поток <key, merge value1 + value2> для записей, которые поступают в оба потока с одним и тем же ключом.Я могу сделать слияние значений вручную, но не совсем понятно, как получить доступ к значениям с правой стороны лямбды.

1 Ответ

3 голосов
/ 16 апреля 2019

В ValueJoiner (left, right) -> { ??? }, left представляет значение из левого потока, а right представляет значение из правого потока

Все, что вам нужно сделать, это добавить свой код в ValueJoiner, как показано ниже:

import org.apache.avro.generic.GenericData.Record;
import org.apache.avro.generic.GenericRecord;

KStream<String, GenericAvroSerde> joined =
    inputTopicStartKStream.leftJoin(inputTopicEndKStream,
    (left, right) -> {
             // You can get access to the generic Avro record by
             // casting both left and right values 
             Record leftRecord = (Record) left;
             Record rightRecord = (Record) right;

             // For the original question, you can simply create a new GenericRecord 
             // with the contents of left and right records
             GenericRecord record = new GenericData.Record(schema);
             record.put("left", left);
             record.put("right", right);
    }
    JoinWindows.of(Duration.ofHours(24)),
    Joined.with(
        stringSerde,
        genericAvroSerde,
        genericAvroSerde)
);
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...