Оператор windowAll во Flink уменьшает масштаб параллелизации до 1? - PullRequest
0 голосов
/ 13 июня 2018

У меня есть поток в Flink, который отправляет кубы из источника, выполняет преобразование в кубе (добавляя 1 к каждому элементу в кубе), а затем отправляет его в нисходящем направлении для печати пропускной способности каждую секунду.

Поток распараллеливается на 4 потока.

Если я правильно понимаю, оператор windowAll является непараллельным преобразованием и поэтому должен уменьшить масштаб параллелизации до 1 и использовать его вместе с * 1006.*, суммируйте пропускную способность всех распараллеленных подзадач за последнюю секунду и напечатайте ее.Я не уверен, что получаю правильный вывод, так как пропускная способность каждую секунду печатается следующим образом:

1> 25
2> 226
3> 354
4> 372
1> 382
2> 403
3> 363
...

Вопрос: выводит ли потоковый принтер пропускную способность из каждого потока (1,2,3 и 4),или это только то, что он выбирает, например, поток 3, чтобы напечатать сумму пропускной способности всех подзадач?

Когда я устанавливаю параллелизм среды в начале в 1 env.setParallelism(1), я не получаю«x>» перед пропускной способностью, но я, кажется, получаю такую ​​же (или даже лучшую) пропускную способность, как когда она установлена ​​на 4. Пример:

45
429
499
505
1
503
524
530
...

Вот фрагмент кода программы:

imports...

public class StreamingCase {
    public static void main(String[] args) throws Exception {
        int parallelism = 4;

        final StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        env.setParallelism(parallelism);

        DataStream<Cube> start = env
                .addSource(new CubeSource());

        DataStream<Cube> adder = start
                .map(new MapFunction<Cube, Cube>() {
                    @Override
                    public Cube map(Cube cube) throws Exception {
                        return cube.cubeAdd(1);
                    }
                });

        DataStream<Integer> throughput = ((SingleOutputStreamOperator<Cube>) adder)
                .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(1)))
                .apply(new AllWindowFunction<Cube, Integer, TimeWindow>() {
                    @Override
                    public void apply(TimeWindow tw,
                                      Iterable<Cube> values,
                                      Collector<Integer> out) throws Exception {
                        int sum = 0;
                        for (Cube c : values)
                            sum++;
                        out.collect(sum);
                    }
                });
        throughput.print();
        env.execute("Cube Stream of Sweetness");
    }
}

1 Ответ

0 голосов
/ 13 июня 2018

Если для параллелизма среды задано значение 3, и вы используете оператор WindowAll, только параллелизм запускает только оконный оператор 1. Приемник все еще будет работать с параллелизмом 3. Следовательно, план выглядит следующим образом:

In_1 -\               /- Out_1
In_2 --- WindowAll_1 --- Out_2
In_3 -/               \- Out_3

Оператор WindowAll отправляет свои выходные данные для своих последующих задач, используя стратегию циклического перебора.По этой причине разные потоки генерируют записи результатов программы.

Когда вы устанавливаете параллелизм среды на 1, все операторы запускаются с одной задачей.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...