Как распечатать план выполнения для пакетной обработки во Flink? - PullRequest
0 голосов
/ 10 декабря 2018

Я создал программу на Java с Flink, которая умножает 2 матрицы.Я использую пакетную среду (DataSet) для ее обработки и хочу показать план выполнения для этого.Когда я сделал какой-то потоковый пример (DataStream), я просто назвал StreamExecutionEnvironment.getExecutionEnvironment().getExecutionPlan().У Flink тот же метод, что и для партии, но когда я его вызываю, я получаю сообщение об ошибке: java.lang.RuntimeException: No new data sinks have been defined since the last execution. The last execution refers to the latest call to 'execute()', 'count()', 'collect()', or 'print()'..Я предполагаю, что делаю то, что точно описано здесь: https://ci.apache.org/projects/flink/flink-docs-stable/dev/execution_plans.html, но по какой-то причине я получаю исключение.

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.MapPartitionFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.util.Collector;

public class MatrixMultiplication {

    private static final String MATRIX_A = "A";
    private static final String MATRIX_B = "B";
    public MatrixMultiplication() throws Exception {

        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        DataSet<Tuple4<String, Integer, Integer, Integer>> matrixA = env.readCsvFile("resources/matrixA.csv")
                .fieldDelimiter(",").types(Integer.class, Integer.class, Integer.class)
                .map(t -> new Tuple4<String, Integer, Integer, Integer>("A", t.f0, t.f1, t.f2))
                .returns(Types.TUPLE(Types.STRING, Types.INT, Types.INT, Types.INT));
        System.out.println("Matrix A");
        matrixA.print();

        DataSet<Tuple4<String, Integer, Integer, Integer>> matrixB = env.readCsvFile("resources/matrixB.csv")
                .fieldDelimiter(",").types(Integer.class, Integer.class, Integer.class)
                .map(t -> new Tuple4<String, Integer, Integer, Integer>("B", t.f0, t.f1, t.f2))
                .returns(Types.TUPLE(Types.STRING, Types.INT, Types.INT, Types.INT));
        System.out.println("Matrix B");
        matrixB.print();

        int columnsMatrixB = 2;
        int linesMatrixA = 2;

        DataSet<Tuple2<Tuple3<Integer, Integer, Integer>, Integer>> keyValueMatrixA = matrixA
                .mapPartition(new MapMatrixToKeysAndValues(columnsMatrixB));

        DataSet<Tuple2<Tuple3<Integer, Integer, Integer>, Integer>> keyValueMatrixB = matrixB
                .mapPartition(new MapMatrixToKeysAndValues(linesMatrixA));
        DataSet<Tuple2<Tuple3<Integer, Integer, Integer>, Integer>> matrixAB = keyValueMatrixA.union(keyValueMatrixB);
        DataSet<Tuple2<Tuple3<Integer, Integer, Integer>, Integer>> matrixAB_01 = matrixAB.groupBy(0)
                .reduce(new ProductReducer());

        DataSet<Tuple2<Tuple2<Integer, Integer>, Integer>> matrixAB_02 = matrixAB_01.map(new SumMapper());

        DataSet<Tuple2<Tuple2<Integer, Integer>, Integer>> productMatrixAB = matrixAB_02.groupBy(0)
                .reduce(new SumReducer());
        System.out.println("Matrix AB");
        productMatrixAB.print();

        // String executionPlan = env.getExecutionPlan();
        // System.out.println("ExecutionPlan ........................ ");
        System.out.println(productMatrixAB.getExecutionEnvironment().getExecutionPlan());
        // System.out.println("........................ ");
    }

    public static class MapMatrixToKeysAndValues implements
            MapPartitionFunction<Tuple4<String, Integer, Integer, Integer>, Tuple2<Tuple3<Integer, Integer, Integer>, Integer>> {

        private static final long serialVersionUID = 6992353073599144457L;
        private int count;

        public MapMatrixToKeysAndValues(int count) {
            this.count = count;
        }

        @Override
        public void mapPartition(Iterable<Tuple4<String, Integer, Integer, Integer>> values,
                Collector<Tuple2<Tuple3<Integer, Integer, Integer>, Integer>> out) throws Exception {

            for (Tuple4<String, Integer, Integer, Integer> tuple : values) {
                for (int c = 1; c <= count; c++) {

                    Tuple3<Integer, Integer, Integer> key = null;
                    Integer value = null;

                    if (MATRIX_A.equals(tuple.f0)) {
                        // key(i,k,i+j) for k=1...N
                        Integer i = tuple.f1;
                        Integer j = tuple.f2;
                        Integer k = c;
                        key = new Tuple3<Integer, Integer, Integer>(i, k, i + j);

                        // value matrix[i,j]
                        value = tuple.f3;
                    } else if (MATRIX_B.equals(tuple.f0)) {
                        // key(i,k,i+j) for i=1...L
                        Integer i = c;
                        Integer j = tuple.f1;
                        Integer k = tuple.f2;
                        key = new Tuple3<Integer, Integer, Integer>(i, k, i + j);

                        // value matrix[j,k]
                        value = tuple.f3;
                    }
                    out.collect(new Tuple2<Tuple3<Integer, Integer, Integer>, Integer>(key, value));
                }
            }
        }
    }

    public static class ProductReducer implements ReduceFunction<Tuple2<Tuple3<Integer, Integer, Integer>, Integer>> {

        private static final long serialVersionUID = 6166767956669902083L;

        @Override
        public Tuple2<Tuple3<Integer, Integer, Integer>, Integer> reduce(
                Tuple2<Tuple3<Integer, Integer, Integer>, Integer> value1,
                Tuple2<Tuple3<Integer, Integer, Integer>, Integer> value2) throws Exception {

            Integer product = null;
            product = value1.f1 * value2.f1;

            return new Tuple2<Tuple3<Integer, Integer, Integer>, Integer>(value1.f0, product);
        }
    }

    public static class SumMapper implements
            MapFunction<Tuple2<Tuple3<Integer, Integer, Integer>, Integer>, Tuple2<Tuple2<Integer, Integer>, Integer>> {

        private static final long serialVersionUID = -1437482917757334157L;

        @Override
        public Tuple2<Tuple2<Integer, Integer>, Integer> map(Tuple2<Tuple3<Integer, Integer, Integer>, Integer> value)
                throws Exception {

            Tuple2<Integer, Integer> key = new Tuple2<Integer, Integer>(value.f0.f0, value.f0.f1);
            return new Tuple2<Tuple2<Integer, Integer>, Integer>(key, value.f1);
        }
    }
    public static class SumReducer implements ReduceFunction<Tuple2<Tuple2<Integer, Integer>, Integer>> {

        private static final long serialVersionUID = 7849401047616065465L;

        @Override
        public Tuple2<Tuple2<Integer, Integer>, Integer> reduce(Tuple2<Tuple2<Integer, Integer>, Integer> value1,
                Tuple2<Tuple2<Integer, Integer>, Integer> value2) throws Exception {

            Tuple2<Integer, Integer> key = new Tuple2<Integer, Integer>(value1.f0.f0, value1.f0.f1);
            Integer value = value1.f1 + value2.f1;

            return new Tuple2<Tuple2<Integer, Integer>, Integer>(key, value);
        }
    }
}

$ cat resources / matrixA.csv

1,1,1
1,2,3
1,3,4
1,4,-2
2,1,6
2,2,2
2,3,-3
2,4,1

$ cat resources / matrixB.csv

1,1,1
1,2,-2
2,1,4
2,2,3
3,1,-3
3,2,-2
4,1,0
4,2,4

1 Ответ

0 голосов
/ 10 декабря 2018

План должен быть распечатан, если вы удалите вызовы print().

print() также запускает выполнение.

...