Как разделить и объединить данные (векторы) в Apache's Flink, не используя windows - PullRequest
0 голосов
/ 04 июня 2018

Мне нужно разбить куб целых чисел на векторы, выполнить некоторые операции над каждым вектором (скажем, простым сложением), а затем объединить векторы обратно в куб.Векторные операции должны выполняться параллельно (т. Е. Вектор на поток).Кубы - это объекты, которые содержат идентификатор.

Я могу разбить куб на векторы и создать кортеж, используя идентификатор куба, а затем использовать keyBy (id) и создать разделение для векторов куба.Однако мне кажется, что для этого мне нужно использовать окно некоторой единицы времени.Приложение очень чувствительно к задержке, поэтому я бы предпочел объединять векторы по мере их поступления, возможно, используя какие-то логические часы (я знаю, сколько векторов находится в кубе), и когда прибывает последний вектор, отправьте повторно собранный куб вниз по течению.Возможно ли это во Flink?

Вот фрагмент кода, иллюстрирующий эту идею:

//Stream topology..
final StreamExecutionEnvironment env =
        StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<Cube> stream = env
    //Take cubes from collection and send downstream
    .fromCollection(cubes)
    //Split the cube(int[][][]) to vectors(int[]) and send downstream
    .flatMap(new VSplitter()) //returns tuple with id at pos 1
    .keyBy(1)
    //For each value in each vector element, add its value with one.
    .map(new MapFunction<Tuple2<CubeVector, Integer>, Tuple2<CubeVector, Integer>>() {
        @Override
        public Tuple2<CubeVector, Integer> map(Tuple2<CubeVector, Integer> cVec) throws Exception {
            CubeVector cv = cVec.getField(0);
            cv.cubeVectorAdd(1);
            cVec.setField(cv, 0);
            return cVec;
        }
    })

    //** Merge vectors back to a cube **//

    .
    .
    .

//The cube splitter to vectors..
public static class VSplitter implements FlatMapFunction<Cube, Tuple2<CubeVector, Integer>> {
    @Override
    public void flatMap(Cube cube, Collector<Tuple2<CubeVector, Integer>> out) throws Exception {
        for (CubeVector cv : cubeVSplit(cube)) {
            //out.assignTimestamp()
            out.collect(new Tuple2<CubeVector, Integer>(cv, cube.getId()));
        }
    }
}

1 Ответ

0 голосов
/ 11 июня 2018

Вы можете использовать FlatMapFunction, который продолжает добавлять CubeVectors, пока не увидит достаточно CubeVectors, чтобы восстановить Cube.Следующий фрагмент кода должен сделать трюк:

DataStream<Tuple2<CubeVector, Integer> input = ...

input.keyBy(1).flatMap(
    new RichFlatMapFunction<Tuple2<CubeVector, Integer>, Cube> {

        private static final ListStateDescriptor<CubeVector> cubeVectorsStateDescriptor = new ListStateDescriptor<CubeVector>(
                "cubeVectors",
                new CubeVectorTypeInformation());

        private static final ValueStateDescriptor<Integer> cubeVectorCounterDescriptor = new ValueStateDescriptor<>(
                "cubeVectorCounter",
                BasicTypeInfo.INT_TYPE_INFO);

        private ListState<CubeVector> cubeVectors;

        private ValueState<Integer> cubeVectorCounter;

        @Override
        public void open(Configuration parameters) {
            cubeVectors = getRuntimeContext().getListState(cubeVectorsStateDescriptor);
            cubeVectorCounter = getRuntimeContext().getState(cubeVectorCounterDescriptor);
        }

        @Override
        public void flatMap(Tuple2<CubeVector, Integer> cubeVectorIntegerTuple2, Collector<Cube> collector) throws Exception {
            cubeVectors.add(cubeVectorIntegerTuple2.f0);
            final int oldCounterValue = cubeVectorCounter.value();

            final int newCounterValue = oldCounterValue + 1;

            if (newCounterValue == NUMBER_CUBE_VECTORS) {
                Cube cube = createCube(cubeVectors.get());

                cubeVectors.clear();
                cubeVectorCounter.update(0);

                collector.collect(cube);
            } else {
                cubeVectorCounter.update(newCounterValue);
            }
        }
    });
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...