У меня простой вопрос. Я хочу соединить 2 KStream
s на одном ключе с GenericRecord
в качестве значения:
final KStream<String, GenericRecord> obs = builder.stream("Observations");
final KStream<String, GenericRecord> foI = builder.stream("FeaturesOfInterest");
final KStream<String, GenericRecord> transformfoIT = foIT
.map((key, value) -> KeyValue.pair(value.get("Observation").toString(), value));
final KStream<String, GenericRecord> merged = obsT.join(
transformfoIT,
(value, location) -> {
value.put("FeatureOfInterest", location);
System.out.println();
return value;
});
tranfsfromfoIT
- это просто, так что установите клавиши правильно. Так что obs.key
и tranformfoIT.key
одинаковы для сообщений.
Но мое соединение не работает, потому что я получаю:
Метод join (KTable, ValueJoiner) в типе KStream не применим для аргументов
(KStream, (значение, местоположение) -> {}) "
Понятия не имею, как это исправить.
Надеюсь, вы мне поможете.