Kafka Join 2 Streams (Кафка 1.0.1) - PullRequest
       30

Kafka Join 2 Streams (Кафка 1.0.1)

0 голосов
/ 24 августа 2018

У меня простой вопрос. Я хочу соединить 2 KStream s на одном ключе с GenericRecord в качестве значения:

final KStream<String, GenericRecord> obs = builder.stream("Observations");
final KStream<String, GenericRecord> foI = builder.stream("FeaturesOfInterest");

final KStream<String, GenericRecord> transformfoIT = foIT
    .map((key, value) -> KeyValue.pair(value.get("Observation").toString(), value)); 

final KStream<String, GenericRecord> merged = obsT.join(
    transformfoIT,
    (value, location) -> {
        value.put("FeatureOfInterest", location);
        System.out.println();
        return value;

    });

tranfsfromfoIT - это просто, так что установите клавиши правильно. Так что obs.key и tranformfoIT.key одинаковы для сообщений.

Но мое соединение не работает, потому что я получаю:

Метод join (KTable, ValueJoiner) в типе KStream не применим для аргументов (KStream, (значение, местоположение) -> {}) "

Понятия не имею, как это исправить.

Надеюсь, вы мне поможете.

1 Ответ

0 голосов
/ 25 августа 2018

Чтобы объединить два KStream, вам также необходимо указать окно соединения с помощью параметра JoinWindows.Сравните:

https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#kstream-kstream-join

Поскольку отсутствует третий параметр JoinWindows, компилятор считает, что вы хотите вызвать join(KTable,...) вместо join(KStream,...), и типы, очевидно, несовместимы.

...