Присоединяйтесь к двум большим томам PCollection есть проблема производительности - PullRequest
3 голосов
/ 09 июля 2019

Присоединяйтесь к двум Pcollection с подходом CoGroupsByKey, который занимает часы, чтобы выполнить более 8 миллионов записей.Отмечено в другом сообщении о потоке стека CoGbkResult имеет более 10000 элементов, требуется повторение (которое может быть медленным) , что "CoGbkResult имеет более 10000 элементов, требуется повторение (которое может быть медленным)."

Любые предложения по улучшению этой производительности с использованием этого подхода.

Вот фрагмент кода,

PCollection<TableRow> pc1 = ...;
PCollection<TableRow> pc2 = ...;

WithKeys<String, TableRow> withKeyValue = 
  WithKeys.of((TableRow row) -> String.format("%s",row.get("KEYNAME")))
          .withKeyType(TypeDescriptors.strings());

PCollection<KV<String,TableRow>> keyed_pc1 =
  pc1.apply("WithKeys", withKeyValue );

PCollection<KV<String,TableRow>> keyed_pc2 = 
  pc2.apply("WithKeys", withKeyValue );

// (org.apache.beam.sdk.extensions.joinlibrary.Join class)
PCollection<KV<String,KV<TableRow,TableRow>>> joinedCollection = 
  Join.innerJoin(keyed_pc1, keyed_pc2); 

Ответы [ 2 ]

1 голос
/ 10 июля 2019

Спецификация Apache Beam не определяет выполнение объединения, и нет более быстрого способа записи внутренних объединений, кроме SDK. Таким образом, ответ на этот вопрос зависит от того, что выполняет объединение, то есть от какого участника. Я не знаю бегунов Flink или Spark, поэтому этот ответ будет конкретным для бегуна Dataflow.

Если вы еще этого не сделали, взгляните на это сообщение в блоге на эту тему . В сообщении блога описывается служба перемешивания потоков данных, которую можно включить вручную. Этот сервис является лучшей реализацией, чем текущее значение по умолчанию, и в целом приводит к гораздо более быстрому выполнению, но особенно для соединений.

Чтобы включить Dataflow Shuffle Service, передайте следующие значения: flags :

--experiments=shuffle_mode=service
--region=<allowed region>

Где разрешенные регионы для перемешивания: «us-central1», «europe-west1», «europe-west4», «asia-northeast1».

0 голосов
/ 11 июля 2019

Насколько я понимаю, у вашего объединения есть горячая клавиша: клавиша, которая имеет много записей и результирующая запись не помещается в памяти рабочего. Это означает, что при последующем использовании это может привести к повторной загрузке данных, что может снизить производительность.

Join.innerJoin по-прежнему использует CoGBK внутри для объединения, поэтому использование этой библиотеки не обязательно будет более эффективным. Хотя порядок перебора коллекций может быть важен.

Если у вас есть небольшая коллекция с одной стороны (умещается в памяти), вы можете использовать подход таблицы поиска для объединения. См. JoinAsLookup для справки.

Если у вас есть какие-то средства, чтобы узнать, какая клавиша горячая, вы можете разделить ее на более мелкие перед объединением, но это требует больше работы на инженерной стороне и некоторого предвидения данных.

...