У меня есть поток в Flink, который отправляет кубы из источника, выполняет преобразование в кубе (добавляя 1 к каждому элементу в кубе), а затем отправляет его в нисходящем направлении для печати пропускной способности каждую секунду.
Поток распараллеливается на 4 потока.
Если я правильно понимаю, оператор windowAll
является непараллельным преобразованием и поэтому должен уменьшить масштаб параллелизации до 1 и использовать его вместе с * 1006.*, суммируйте пропускную способность всех распараллеленных подзадач за последнюю секунду и напечатайте ее.Я не уверен, что получаю правильный вывод, так как пропускная способность каждую секунду печатается следующим образом:
1> 25
2> 226
3> 354
4> 372
1> 382
2> 403
3> 363
...
Вопрос: выводит ли потоковый принтер пропускную способность из каждого потока (1,2,3 и 4),или это только то, что он выбирает, например, поток 3, чтобы напечатать сумму пропускной способности всех подзадач?
Когда я устанавливаю параллелизм среды в начале в 1 env.setParallelism(1)
, я не получаю«x>» перед пропускной способностью, но я, кажется, получаю такую же (или даже лучшую) пропускную способность, как когда она установлена на 4. Пример:
45
429
499
505
1
503
524
530
...
Вот фрагмент кода программы:
imports...
public class StreamingCase {
public static void main(String[] args) throws Exception {
int parallelism = 4;
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
env.setParallelism(parallelism);
DataStream<Cube> start = env
.addSource(new CubeSource());
DataStream<Cube> adder = start
.map(new MapFunction<Cube, Cube>() {
@Override
public Cube map(Cube cube) throws Exception {
return cube.cubeAdd(1);
}
});
DataStream<Integer> throughput = ((SingleOutputStreamOperator<Cube>) adder)
.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(1)))
.apply(new AllWindowFunction<Cube, Integer, TimeWindow>() {
@Override
public void apply(TimeWindow tw,
Iterable<Cube> values,
Collector<Integer> out) throws Exception {
int sum = 0;
for (Cube c : values)
sum++;
out.collect(sum);
}
});
throughput.print();
env.execute("Cube Stream of Sweetness");
}
}