У меня проблема с тем, что я пытаюсь покинуть 2 потока. MergedKey имеет более 100
списки с тем же ключом и DataStream имеет только 1 листинг с тем же ключом, что и mergedKey. Я хочу в endStream Значение из mergedKey Объединить с DataStream.
//get DataStream
final KStream<String, GenericRecord> DataStream = builder.stream("Datastreams");
// Transform merged to Equals Keys to DataStream.Iot
final KStream<String, GenericRecord> mergedKey = mergedFoIObs
.map((key, value) -> KeyValue.pair(value.get("Datastream").toString(), value));
// Join the DataStream with MergedStream
final KStream<String, String> mergedFoIObsData = mergedKey.leftJoin(
DataStream,
(value, data) -> {
try {
if(data != null{
value.put("Datastream", data.toString());
JSONObject jo = (JSONObject) new JSONParser().parse(value.toString());
return jo.toJSONString();}
return null
} catch (ParseException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return null;
}, JoinWindows.of(10000));
Но моя проблема в том, что в Endstream я получаю только 1 листинг с правильными значениями, а другие листинги имеют нулевые значения.
Taht означает, что данные являются нулевыми после первого "раунда".
Когда я преобразовываю DataStream в KTable, у меня возникает проблема с тем, что я получаю правильные списки, но только 37 списков, поэтому 60 ошибочно.
Надеюсь, вы мне поможете.