Как установить динамический базовый путь для функции BucketingSink Flink? - PullRequest
0 голосов
/ 19 февраля 2019

Я беру некоторые записи JSON из файла.Я хочу проанализировать json, а затем, основываясь на поле в json, обновить базовый путь функции группирования.

, например, для: Json-запись содержит имя поля 'user-id' и основано на том, что я хочуобновить мой базовый путь как BucketingSink ("/ data / app / users /" + user-id-field-value + "/")

Как мне это сделать?

Код: DataStream input = env.readTextFile ("/ home / user / Desktop / jsonFile");

    DataStream<String> parsedJson = input.map((inputMsg)->{

        String json="";
        try{

            json=jsonParser.parse(inputMsg).getAsString();

        }catch (Exception e){
            e.printStackTrace();
        }
        return json;

    });

   parsedJson.addSink(new BucketingSink<>(""));

}

1 Ответ

0 голосов
/ 20 февраля 2019

Используйте метод BucketingSink.setBucketer () для установки создаваемого вами класса, который реализует интерфейс Bucketer и использует значение поля user-id в качестве пути подкатегории.

...