spark sql: как добиться параллельной обработки фрейма данных на уровне группы, но в каждой группе требуется последовательная обработка строк - PullRequest
0 голосов
/ 17 октября 2019
  1. Применение группировки к фрейму данных. Допустим, это привело к 100 группам по 10 строк в каждой.
  2. У меня есть функция, которую нужно применить к каждой группе. Это может происходить параллельно и в любом порядке (т. Е. На усмотрение выбора любой группы в любом порядке для выполнения).
  3. Но в группе мне нужна гарантия последовательной обработкистрок. Потому что после обработки каждой строки в группе я использую выходные данные при обработке любой из строк, оставшихся в группе.

Мы выбрали следующий подход, когда все работало на драйвере и не могло использовать параллельную обработку искрового кластера между узлами (как и ожидалось, производительность была очень плохой)

1) Breakосновной DF на несколько фреймов данных, помещенных в массив:

val securityIds = allocStage1DF.select("ALLOCONEGROUP").distinct.collect.flatMap(_.toSeq)
val bySecurityArray = securityIds.map(securityIds => allocStage1DF.where($"ALLOCONEGROUP" <=> securityIds))

2) Цикл по фрейму данных и переход к методу для обработки, строка за строкой сверху фрейма данных:

df.coalesce(1).sort($"PRIORITY" asc).collect().foreach({
  row => AllocOneOutput.allocOneOutput(row)}
)

Мы ищем комбинацию параллельной и последовательной обработки.

  1. Параллельная обработка на уровне группы. потому что все они являются независимыми группами и могут быть распараллелены.

  2. В каждой группе строки должны обрабатываться одна за другой в последовательности, что очень важно для нашего варианта использования.

пример данных

Применить группирование по SECURITY_ID, CC, BU, MPU, что дает нам 2 группы сверху (SECID_1, CC_A, BU_A,MPU_A и SECID_2, CC_A, BU_A, MPU_A).

с помощью матрицы приоритетов (ничего, кроме таблицы ссылок для присвоения ранга строкам), мы переносим каждую группу в ниже:

Транспонированные данные

Каждая строка в вышеуказанной группе имеет приоритет и сортируется в указанном порядке. Теперь я хочу обрабатывать каждую строку одну за другой, передавая их функции и получая вывод, как показано ниже:

output

Подробное объяснение сценария использования:

  1. Базовый фрейм данных содержит данные о всех торговых позициях финансовой фирмы. некоторые клиенты покупают (долго) данный финансовый продукт (уникально идентифицируемый по securityId), а некоторые продают (коротко) его.
  2. Идея нашего приложения состоит в том, чтобы идентифицировать / связать длинные позиции и короткие позиции в заданном идентификаторе безопасности.
  3. Поскольку такое сопряжение происходит с идентификатором безопасности, мы сказали, что базовый фрейм данныхразделены на группы на основе этого идентификатора безопасности, и каждая группа может обрабатываться независимо.
  4. Почему мы ищем последовательную обработку внутри группы? Это происходит потому, что, когда в данной группе имеется много длинных позиций и много коротких позиций (как в примере данных), справочная таблица (матрица приоритетов) решает, с какой длинной позицией следует сопоставить короткую позицию. в основном, это дает порядок обработки.
  5. Вторая причина заключается в том, что когда заданное длинное количество и короткое количество не равны, то остаточное количество подходит для спаривания. то есть, если длинное количество осталось, то оно может быть соединено со следующим коротким количеством, доступным в группе согласно приоритету или наоборот.
  6. По причинам, упомянутым в 4 и 5, мы ищемобрабатывать строку за строкой в ​​группе.

Выше приведены точки, описанные ниже с использованием набора данных.

Base DataFrame

    +-------------+----------+----------+------
ROW_NO|SECURITY_ID| ACCOUNT|POSITION|
+-------------+----------+----------+------
  1|      secId|   Acc1|       +100|
  2|      secId|   Acc2|       -150|
  3|      secId|   Acc3|       -25|
  4|      secId2|   Acc3|       -25|
  5|      secId2|   Acc3|       -25|

Базовый кадр данных разделенна основе группы по идентификатору безопасности. Давайте воспользуемся второй группой, как показано ниже

+-------------+----------+----------+------
ROW_NO|SECURITY_ID| ACCOUNT|POSITION|
+-------------+----------+----------+------
  1|      secId|   Acc1|       +100|
  2|      secId|   Acc2|       -150|
  3|      secId|   Acc3|       -25|

. В вышеприведенном случае положительная позиция 100 может быть в паре с -50 или -25. Чтобы разорвать связь, следующая таблица ссылок, называемая приоритетной матрицей, помогает определить порядок.

+-------------+----------+----------+------
+vePositionAccount|-vePositionAccount| RANK
+-------------+----------+----------+------
  Acc1|            Acc3|              1|     
  Acc1|            Acc2|              2|  

, поэтому из вышеприведенной матрицы мы знаем, что сначала будут соединены строки № 1 и 3, а затем строки № 1 и2. Это тот порядок (последовательная обработка), о котором мы говорим. Позволяет соединить их сейчас, как показано ниже:

+-------------+----------+----------+------+-------------+----------+----------+------
+veROW_NO|+veSECURITY_ID| +veACCOUNT|+vePOSITION|+veROW_NO|+veSECURITY_ID| +veACCOUNT|+vePOSITION|
+-------------+----------+----------+------+-------------+----------+----------+------
  1|      secId|    Acc1|       +100|      3|       secId|   Acc3|       -25|
  1|      secId|    Acc1|       +100|      2|       secId|   Acc2|       -150|

Что происходит, когда строка 1 обрабатывается после строки 2? (это то, что нам нужно)

1.После обработки строки 1 - Позиция в Acc1 будет (100 - 25) = 75 Позиция в Acc3 будет 0. Обновленная позиция в Acc1, которая равна 75, теперь будет использоваться при обработке второй строки.

2.После обработки строки 2 - Позиция в Acc1 будет 0. Положение в Acc2 будет (75-150) -75.

Результирующий фрейм данных:

+-------------+----------+----------+------
ROW_NO|SECURITY_ID| ACCOUNT|POSITION|
+-------------+----------+----------+------
  1|      secId|   Acc1|       0|
  2|      secId|   Acc2|       -75|
  3|      secId|   Acc3|       0|

Что происходит, когда строка 2 обрабатывается после строки 1? (мы не хотим этого)

  1. После обработки строки 2 - Позиция в Acc1 будет 0 Позиция в Acc2 будет (100-150) -50. Обновленная позиция в Acc1, которая равна 0, теперь будет использоваться при обработке первого ряда.
  2. После обработки строки 1 - Позиция в Acc1 будет 0. Позиция в Acc3 останется неизменной при -25.

Результирующий фрейм данных:

+-------------+----------+----------+------
ROW_NO|SECURITY_ID| ACCOUNT|POSITION|
+-------------+----------+----------+------
  1|      secId|   Acc1|       0|
  2|      secId|   Acc2|       -50|
  3|      secId|   Acc3|       -25|

Как вы видели выше, порядок обработки в группе определяет наш вывод

Я также хотел спросить - почему не искра поддерживает последовательнуюобработка в разделе данных кадра? мы говорим, что нам нужны возможности параллельной обработки кластера. Вот почему мы разделяем фрейм данных на группы и просим кластер применять логику к этим группам параллельно. Все, что мы говорим: если группа имеет, скажем, 100 строк, то пусть эти 100 строк обрабатываются 1 за другим в порядке. Разве это не поддерживается искрой?

Если это не так, то какие еще технологии в больших данных могут помочь достичь этого?

Альтернативная реализация:

  1. Разбейте информационный фрейм на столько разделов, сколько на количество групп (в нашем случае - 50000. Групп больше, чем строк в любой группе). не более нескольких сотен).
  2. Запустить действие 'ForeachPartition' для фрейма данных, где логика выполняется независимо между разделами.
  3. записать выходные данные обработки каждого раздела в кластер.
  4. После обработки всего фрейма данных отдельное задание будет читать эти отдельные файлы с шага 3 и записывать в один файл / фрейм данных.

Я сомневаюсь, еслиТысячи разделов - это хорошо, но все же хотелось бы знать, звучит ли подход хорошо.

1 Ответ

0 голосов
/ 21 октября 2019

Концепция работает достаточно хорошо, пока не соблюдается это правило:

Вторая причина заключается в том, что когда заданное длинное количество и короткое количество не равны, то остаточное количество может быть использовано для спаривания. то есть, если длинное количество осталось, то оно может быть соединено со следующим коротким количеством, доступным в группе согласно приоритету или наоборот.

Это потому, что вы хотите итерацию, циклс логикой зависимостей, это трудно кодировать с помощью Spark, который более ориентирован на поток.

Я также работал над проектом, где все было заявлено - делайте это в Big Data с помощью Spark, scala или pyspark. Будучи архитектором и программистом, я смотрел на алгоритм для чего-то похожего на вашу область, но не совсем, в котором для товаров все периоды для набора точек данных должны быть классифицированы как быки,медведь или нет. Как и ваш алгоритм, но все же другой, я не знал, какое количество циклов нужно сделать заранее. На самом деле мне нужно было что-то сделать, а затем решить повторить что-то слева и справа от периода, который я пометил как бык, или медведь, или ничего, потенциально. Условия прекращения были обязательны. Смотрите картинку ниже. Вроде как «плоский» обход бинарного дерева, пока не исчерпаны все пути. Не то чтобы Spark-ish.

enter image description here

Я на самом деле - для академических целей - решил мою конкретную ситуацию в Spark, но это былоакадемическое упражнение. Дело в том, что этот тип обработки - мой пример и ваш пример - плохо подходит для Spark. Мы выполнили эти вычисления в ORACLE и просто в квотах результатов в хранилище данных Hadoop.

Поэтому я советую вам не пытаться делать это в Spark, так какне подходит для случаев использования достаточно хорошо. Поверь мне, это становится грязным. Если честно, вскоре мне стало ясно, что этот тип обработки был проблемой. Но когда вы начинаете, это общий аспект запроса.

...