Flink DataSet для BuckingSink - PullRequest
       6

Flink DataSet для BuckingSink

0 голосов
/ 11 февраля 2019

Можно ли записать набор данных в bucketingSink?

env.createInput(textInputFormat)
                .rebalance()
                .flatMap(new TopicBasedXMLProcessFunction(sourceType))
                .name("Transformer")
                .output(textOutputFormat)
                .withParameters(conf)
                .name("Write"); 

По сути, я получаю Tuple2<String, String> от TopicBasedXMLProcessFunction Я хотел бы динамически построить путь из tuple2.fo, как показано ниже

In-Stream Computing, я использовал Custom Bucketerдля построения динамического пути

@Override
    public Path getBucketPath(Clock clock, Path basePath, Tuple2<String, String> element) {
        return new Path(basePath + "/schema=" + element.f0 + "/");
    }

Я уже реализовал аналогичные решения в StreamExecutionEnvironment(STREAM), но верно, я не могу добиться с помощью ExecutionEnvironment(BATCH), есть ли указатель для достижения этого

...