Мне нужно разбить куб целых чисел на векторы, выполнить некоторые операции над каждым вектором (скажем, простым сложением), а затем объединить векторы обратно в куб.Векторные операции должны выполняться параллельно (т. Е. Вектор на поток).Кубы - это объекты, которые содержат идентификатор.
Я могу разбить куб на векторы и создать кортеж, используя идентификатор куба, а затем использовать keyBy (id) и создать разделение для векторов куба.Однако мне кажется, что для этого мне нужно использовать окно некоторой единицы времени.Приложение очень чувствительно к задержке, поэтому я бы предпочел объединять векторы по мере их поступления, возможно, используя какие-то логические часы (я знаю, сколько векторов находится в кубе), и когда прибывает последний вектор, отправьте повторно собранный куб вниз по течению.Возможно ли это во Flink?
Вот фрагмент кода, иллюстрирующий эту идею:
//Stream topology..
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Cube> stream = env
//Take cubes from collection and send downstream
.fromCollection(cubes)
//Split the cube(int[][][]) to vectors(int[]) and send downstream
.flatMap(new VSplitter()) //returns tuple with id at pos 1
.keyBy(1)
//For each value in each vector element, add its value with one.
.map(new MapFunction<Tuple2<CubeVector, Integer>, Tuple2<CubeVector, Integer>>() {
@Override
public Tuple2<CubeVector, Integer> map(Tuple2<CubeVector, Integer> cVec) throws Exception {
CubeVector cv = cVec.getField(0);
cv.cubeVectorAdd(1);
cVec.setField(cv, 0);
return cVec;
}
})
//** Merge vectors back to a cube **//
.
.
.
//The cube splitter to vectors..
public static class VSplitter implements FlatMapFunction<Cube, Tuple2<CubeVector, Integer>> {
@Override
public void flatMap(Cube cube, Collector<Tuple2<CubeVector, Integer>> out) throws Exception {
for (CubeVector cv : cubeVSplit(cube)) {
//out.assignTimestamp()
out.collect(new Tuple2<CubeVector, Integer>(cv, cube.getId()));
}
}
}