Как добавить приемник из одного потока данных на разные пути в зависимости от конкретного ключа в json? - PullRequest
0 голосов
/ 25 апреля 2019

У меня есть jsons, как,

{
  "name":"someone",
  "job":"doctor",
  "etc":"etc"
}

в каждом JSON есть разные значения для «работы», такие как врач, пилот, водитель, сторож и т. Д. Я хочу разделить каждый JSON на основе значения «задания» и хранить его в разных местах, таких как /home/doctor, /home/pilot, /home/driver и т. д.

Я пытался сделать это с помощью функции SplitStream, но мне нужно указать это значение, чтобы оно соответствовало условию.

public class MyFlinkJob {   
    private static JsonParser jsonParser = new JsonParser();
    private static String key_1 = "doctor";
    private static String key_2 = "driver";
    private static String key_3 = "pilot";
    private static String key_default = "default";

    public static void main(String args[]) throws Exception {
        Properties prop = new Properties();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        Properties props = new Properties();
        props.setProperty("bootstrap.servers", kafka);
        props.setProperty("group.id", "myjob");

        FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), props);
        DataStream<String> record = env.addSource(myConsumer).rebalance()

        SplitStream<String> split = record.split(new OutputSelector<String>() {
            @Override
            public Iterable<String> select(String val) {
                JsonObject json = (JsonObject)jsonParser.parse(val);
                String jsonValue = CommonFields.getFieldValue(json, "job");
                List<String> output = new ArrayList<String>();

                if (key_1.equalsIgnoreCase(jsonValue)) {
                    output.add("doctor");
                } else if (key_2.equalsIgnoreCase(jsonValue)) {
                    output.add("driver");
                } else if (key_3.equalsIgnoreCase(jsonValue)) {
                    output.add("pilot");
                } else {
                    output.add("default");
                }
                return output;
            }});

        DataStream<String> doctor = split.select("doctor");
        DataStream<String> driver = split.select("driver");
        DataStream<String> pilot = split.select("pilot");
        DataStream<String> default1 = split.select("default");
        doctor.addSink(getBucketingSink(batchSize, prop, key_1));
        driver.addSink(getBucketingSink(batchSize, prop, key_2));
        pilot.addSink(getBucketingSink(batchSize, prop, key_3));
        default1.addSink(getBucketingSink(batchSize, prop, key_default));
        env.execute("myjob");
    } catch (IOException ex) {
        ex.printStackTrace();
    } finally {
        if (input != null) {
            try {
                input.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

public static BucketingSink<String> getBucketingSink(Long BatchSize, Properties prop, String key) {
    BucketingSink<String> sink = new BucketingSink<String>("hdfs://*/home/"+key);
    Configuration conf = new Configuration();
    conf.set("hadoop.job.ugi", "hdfs");
    sink.setFSConfig(conf);
    sink.setBucketer(new DateTimeBucketer<String>(prop.getProperty("DateTimeBucketer")));
    return sink;
}
}

предположим, что если в "задании" присутствует какое-либо другое значение, например, инженер или что-то еще, и я не указал в классе, то оно переходит в папку по умолчанию, есть ли способ автоматически разделить эти json-события на основе значения "задания" без указав его и создав путь, содержащий имя значения, например / home / enginerr?

1 Ответ

1 голос
/ 25 апреля 2019

Вы хотите использовать BucketingSink , который поддерживает запись записей в отдельные сегменты на основе значения поля. У меня, вероятно, есть функция map, которая принимает строку JSON, анализирует ее и выдает Tuple2<String, String>, где первый элемент - это значение поля job в JSON, а второй - полный JSON. строка.

...