Apache Flink: среда исполнения и несколько приемников - PullRequest
0 голосов
/ 28 июня 2018

Мой вопрос может вызвать путаницу, поэтому сначала прочтите описание. Может быть полезно определить мою проблему. Я добавлю свой код позже в конце вопроса (также приветствуются любые предложения относительно моей структуры / реализации кода). Спасибо за любую помощь заранее!

Мой вопрос:

  1. Как определить несколько приемников в пакетной обработке Flink без повторного получения данных из одного источника?

  2. В чем разница между createCollectionEnvironment() и getExecutionEnvironment()? Какой из них я должен использовать в местной среде?

  3. Какая польза от env.execute()? Мой код будет выводить результат без этого предложения. если я добавлю это предложение, появится исключение:

-

Exception in thread "main" java.lang.RuntimeException: No new data sinks have been defined since the last execution. The last execution refers to the latest call to 'execute()', 'count()', 'collect()', or 'print()'. 
    at org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:940) 
    at org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:922) 
    at org.apache.flink.api.java.CollectionEnvironment.execute(CollectionEnvironment.java:34) 
    at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:816) 
    at MainClass.main(MainClass.java:114)

Описание: Новое в программировании. В последнее время мне нужно обработать некоторые данные (группировать данные, вычислить стандартное отклонение и т. Д.), Используя пакетную обработку Flink. Однако я дошел до того, что мне нужно вывести два DataSet. Структура была примерно такая

Из источника (база данных) -> DataSet 1 (добавить индекс с помощью zipWithIndex ()) -> DataSet 2 (выполнить некоторые вычисления, сохраняя индекс) -> DataSet 3

Сначала я вывел DataSet 2, индекс, например, от 1 до 10000; И тогда я вывожу DataSet 3, индекс становится от 10001 до 20000, хотя я не изменил значение ни в одной функции. Я предполагаю, что при выводе DataSet 3 вместо использования результата ранее рассчитанный DataSet 2 он начинался с получения данных из базы данных снова и последующего вычисления. С использованием функции ZipWithIndex() она не только дает неправильный порядковый номер, но и увеличивает соединение с дБ.

Полагаю, это относится к среде выполнения, как, например, когда я использую

ExecutionEnvironment env = ExecutionEnvironment.createCollectionsEnvironment ();

даст "неправильный" номер индекса (10001-20000) и

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment ();

даст правильный номер индекса (1-10000) Время и количество подключений к базе данных различаются, и порядок печати будет обратным.

ОС, БД, другие подробности и версии среды: IntelliJ IDEA 2017.3.5 (версия для сообщества) Сборка № IC-173.4674.33, построена 6 марта 2018 г. JRE: 1.8.0_152-release-1024-b15 amd64 JVM: 64-битная серверная виртуальная машина OpenJDK от JetBrains s.r.o Windows 10 10.0

Мой тестовый код (Java):

public static void main (String [] args) генерирует Exception { ExecutionEnvironment env = ExecutionEnvironment.createCollectionsEnvironment ();

    //Table is used to calculate the standard deviation as I figured that there is no such calculation in DataSet.
    BatchTableEnvironment tableEnvironment = TableEnvironment.getTableEnvironment(env);

    //Get Data from a mySql database
    DataSet<Row> dbData =
            env.createInput(
                    JDBCInputFormat.buildJDBCInputFormat()
                            .setDrivername("com.mysql.cj.jdbc.Driver")
                            .setDBUrl($database_url)
                            .setQuery("select value from $table_name where id =33")
                            .setUsername("username")
                            .setPassword("password")
                            .setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.DOUBLE_TYPE_INFO))
                            .finish()
            );

    // Add index for assigning group (group capacity is 5)
    DataSet<Tuple2<Long, Row>> indexedData = DataSetUtils.zipWithIndex(dbData);

    // Replace index(long) with group number(int), and convert Row to double at the same time
    DataSet<Tuple2<Integer, Double>> rawData = indexedData.flatMap(new GroupAssigner());

    //Using groupBy() to combine individual data of each group into a list, while calculating the mean and range in each group
    //put them into a POJO named GroupDataClass
    DataSet<GroupDataClass> groupDS = rawData.groupBy("f0").combineGroup(new GroupCombineFunction<Tuple2<Integer, Double>, GroupDataClass>() {
        @Override
        public void combine(Iterable<Tuple2<Integer, Double>> iterable, Collector<GroupDataClass> collector) {
            Iterator<Tuple2<Integer, Double>> it = iterable.iterator();
            Tuple2<Integer, Double> var1 = it.next();
            int groupNum = var1.f0;

            // Using max and min to calculate range, using i and sum to calculate mean
            double max = var1.f1;
            double min = max;
            double sum = 0;
            int i = 1;

            // The list is to store individual value
            List<Double> list = new ArrayList<>();
            list.add(max);

            while (it.hasNext())
            {
                double next = it.next().f1;
                sum += next;
                i++;
                max = next > max ? next : max;
                min = next < min ? next : min;
                list.add(next);
            }

            //Store group number, mean, range, and 5 individual values within the group
            collector.collect(new GroupDataClass(groupNum, sum / i, max - min, list));
        }
    });

    //print because if no sink is created, Flink will not even perform the calculation.
    groupDS.print();


    // Get the max group number and range in each group to calculate average range
    // if group number start with 1 then the maximum of group number equals to the number of group
    // However, because this is the second sink, data will flow from source again, which will double the group number
    DataSet<Tuple2<Integer, Double>> rangeDS = groupDS.map(new MapFunction<GroupDataClass, Tuple2<Integer, Double>>() {
        @Override
        public Tuple2<Integer, Double> map(GroupDataClass in) {
            return new Tuple2<>(in.groupNum, in.range);
        }
    }).max(0).andSum(1);

    // collect and print as if no sink is created, Flink will not even perform the calculation.
    Tuple2<Integer, Double> rangeTuple = rangeDS.collect().get(0);
    double range = rangeTuple.f1/ rangeTuple.f0;
    System.out.println("range = " + range);
}

public static class GroupAssigner implements FlatMapFunction<Tuple2<Long, Row>, Tuple2<Integer, Double>> {
    @Override
    public void flatMap(Tuple2<Long, Row> input, Collector<Tuple2<Integer, Double>> out) {

        // index 1-5 will be assigned to group 1, index 6-10 will be assigned to group 2, etc.
        int n = new Long(input.f0 / 5).intValue() + 1;
        out.collect(new Tuple2<>(n, (Double) input.f1.getField(0)));
    }
}

1 Ответ

0 голосов
/ 28 июня 2018

1) Можно подключить источник к нескольким приемникам, источник выполняется только один раз, а записи передаются на несколько приемников. См. Этот вопрос Может ли Flink записывать результаты в несколько файлов (например, в Hadoop MultipleOutputFormat)?

2) getExecutionEnvironment - это верный способ получить окружающую среду, когда вы хотите запустить свою работу. createCollectionEnvironment - хороший способ поиграть и проверить. См. документацию

3) Сообщение об ошибке исключения очень ясно: если вы вызываете print или collect, ваш поток данных выполняется. Таким образом, вы должны сделать выбор:

  • Либо вы вызываете print / collect в конце вашего потока данных, и он выполняется и печатается. Это хорошо для тестирования. Помните, что вы можете вызывать метод сбора / печати только один раз для каждого потока данных, в противном случае он выполняется много раз, пока он не определен полностью
  • Либо вы добавляете приемник в конце потока данных и вызываете env.execute (). Это то, что вы хотите сделать, когда ваш поток станет более зрелым.
...