Могу ли я использовать пользовательский разделитель с группированием по? - PullRequest
0 голосов
/ 12 октября 2018

Допустим, я знаю, что мой набор данных несбалансирован, и я знаю распределение ключей.Я хотел бы использовать это, чтобы написать собственный разделитель, чтобы получить максимальную отдачу от экземпляров операторов.

Я знаю о DataStream # partitionCustom .Однако, если мой поток настроен на ключ, будет ли он работать правильно?Моя работа выглядела бы примерно так:

KeyedDataStream afterCustomPartition = keyedStream.partitionCustom(new MyPartitioner(), MyPartitionKeySelector())

DataStreamUtils.reinterpretAsKeyedStream(afterCustomPartition, new MyGroupByKeySelector<>()).sum()

Я пытаюсь добиться:

  • Наличие потока keyBy в соответствии с некоторым ключом, так что функция сокращения будет тольковызывается с элементами этого ключа.
  • Группировка путем разделения работы по узлам на основе некоторого пользовательского разбиения.
  • Пользовательское разбиение, возвращающее число, основанное на количестве экземпляров параллельного оператора (которое будетбыть исправленным и не подлежащим изменению масштаба).
  • Пользовательский раздел, возвращающий различные значения из keyBy.Тем не менее, keyBy(x) = keyBy(y) => partition(x) = partition(y).
  • Наличие предварительной агрегации для минимизации сетевого трафика перед разбиением.

Пример варианта использования:

  • Набор данных: [(0, A), (0, B), (0, C), (1, D), (2, E)]
  • Количество экземпляров параллельных операторов: 2
  • Группировка по функции: возвращает 1-й элемент пары
  • Функция разделения: возвращает 0 для ключа 0 и 1 для ключей 1 и 2. Преимущество: иметь дело с перекосом данных, который может быть отправленключи 0 и 1 к одному и тому же экземпляру оператора, что будет означать, что один экземпляр оператора получит 80% набора данных.

1 Ответ

0 голосов
/ 12 октября 2018

К сожалению, это невозможно.DataStreamUtils.reinterpretAsKeyedStream() требует, чтобы данные были разделены идентично, как если бы вы назвали keyBy().

Причиной этого ограничения являются группы ключей и то, как ключи сопоставляются с группами ключей.Группа ключей - это единица Флинка того, как распределяется состояние ключа.Количество групп ключей определяет максимальный параллелизм оператора и настраивается с помощью setMaxParallelism().Ключи назначаются группам ключей с внутренней хэш-функцией.Изменяя разделение ключей, ключи для одной и той же группы ключей будут распределены по нескольким машинам, которые не будут работать.

Чтобы настроить назначение ключей компьютерам, необходимо изменить назначение ключей.в ключевые группы.Однако нет общедоступного или доступного интерфейса для этого.Поэтому пользовательские распределения ключей не поддерживаются в Flink 1.6.

...