Я беру некоторые записи JSON из файла.Я хочу проанализировать json, а затем, основываясь на поле в json, обновить базовый путь функции группирования.
, например, для: Json-запись содержит имя поля 'user-id' и основано на том, что я хочуобновить мой базовый путь как BucketingSink ("/ data / app / users /" + user-id-field-value + "/")
Как мне это сделать?
Код: DataStream input = env.readTextFile ("/ home / user / Desktop / jsonFile");
DataStream<String> parsedJson = input.map((inputMsg)->{
String json="";
try{
json=jsonParser.parse(inputMsg).getAsString();
}catch (Exception e){
e.printStackTrace();
}
return json;
});
parsedJson.addSink(new BucketingSink<>(""));
}