Объединяет несколько потоков по одним и тем же ключам - PullRequest
0 голосов
/ 16 января 2020

Сообщество Flink!

У меня есть вопрос, касающийся объединения нескольких потоков с помощью одних и тех же ключей во Flink (equi-joins). Я все еще новичок ie, оценивающий Flink для моей команды, переводящий наше приложение пакетной обработки Spark на потоковую обработку.

Примечание. Загляните в Apache Машинное отделение Флинка .

Чтобы упростить вопрос, давайте предположим, что у вас есть 3 потока, и каждый поток имеет уникальные записи, которые могут быть введены с помощью id поле. Для каждой записи в потоке вы найдете соответствующую запись в других потоках. Вы хотите присоединиться к этим потокам в поле id .

Вопросы:

  • Когда вы присоединяетесь к потоку № 1 и потоку № 2, я понимаю, что содержимое оба потока будут перетасованы, основываясь на ключе соединения. Когда я присоединяюсь к результирующему потоку с потоком # 3, я предполагаю, что поток # 3 будет перетасован, но будут ли снова перетасованы предыдущие записи результата (т. Е. Из соединения между # 1 и # 2)?

(в Spark, я думаю, что предыдущие результаты объединения не будут перемешаны при условии, что ключи не изменены и используется тот же разделитель ha sh).

  • При объединении с поток # 3, будут ли сериализованы и десериализованы результаты объединения потоков # 1 и #?

  • В этом примере у нас есть 2 оператора соединения (между потоком # 1 и # 2, и между полученными результатами соединения и потоком # 3). Из того, что я вижу, у каждого из моих операторов есть состояние. Предполагая, что я использую Global Windows и мои операции соединения просто сохраняют все поля, есть ли дублирующие данные между двумя состояниями операторов? (Я наивно думаю, что не следует ...)

То, что я говорю, немного сбивает с толку, но я думаю, что первый оператор запомнит данные из потока # 1 и поток № 2, а затем второй оператор запомнит данные из потока № 1, потока № 2 и потока № 3. Что я заметил, размер состояния для моего первого оператора велик (в моем эксперименте данные за 1 год), но размер состояния для второго оператора намного больше ... Окончательный размер состояния контрольной точки, похоже, равен размеру состояния для # Соединение 1 / # 2 плюс размер состояния для соединения # 1 / # 2 / # 3 (не должно ли это быть размером # 3, если данные соединения # 1 / # 2 одинаковы?)

Спасибо, Николас

1 Ответ

1 голос
/ 16 января 2020

В настоящее время в Flink для каждого объединения потоков требуется полное перемешивание, включая сериализацию и десериализацию, как вы упоминали. Основная причина в том, что Flink не может связать операторов с 2 входами в предыдущий оператор. В настоящее время находится в процессе выполнения , чтобы позволить N операторам входов, которые в точности исключают возможность дополнительного перемешивания в вашем случае использования.

Каждый из операторов объединения поддерживает свое состояние индивидуально. Это означает, что ваше второе объединение содержит все присоединенные записи и все записи из потока № 3. Если ваше первое объединение имеет мощность 1, второе объединение имеет больший размер состояния, чем первое. Причина кажущейся избыточной репликации заключается в том, что при использовании времени windows (обычно единственно возможного способа объединения потоков) оба оператора могут находиться в разное время, так что первый оператор уже удалил записи из своего состояния, когда второй оператор обрабатывает их.

...