Можно ли записать набор данных в bucketingSink?
env.createInput(textInputFormat)
.rebalance()
.flatMap(new TopicBasedXMLProcessFunction(sourceType))
.name("Transformer")
.output(textOutputFormat)
.withParameters(conf)
.name("Write");
По сути, я получаю Tuple2<String, String>
от TopicBasedXMLProcessFunction
Я хотел бы динамически построить путь из tuple2.fo, как показано ниже
In-Stream Computing, я использовал Custom Bucketerдля построения динамического пути
@Override
public Path getBucketPath(Clock clock, Path basePath, Tuple2<String, String> element) {
return new Path(basePath + "/schema=" + element.f0 + "/");
}
Я уже реализовал аналогичные решения в StreamExecutionEnvironment(STREAM)
, но верно, я не могу добиться с помощью ExecutionEnvironment(BATCH)
, есть ли указатель для достижения этого