- Применение группировки к фрейму данных. Допустим, это привело к 100 группам по 10 строк в каждой.
- У меня есть функция, которую нужно применить к каждой группе. Это может происходить параллельно и в любом порядке (т. Е. На усмотрение выбора любой группы в любом порядке для выполнения).
- Но в группе мне нужна гарантия последовательной обработкистрок. Потому что после обработки каждой строки в группе я использую выходные данные при обработке любой из строк, оставшихся в группе.
Мы выбрали следующий подход, когда все работало на драйвере и не могло использовать параллельную обработку искрового кластера между узлами (как и ожидалось, производительность была очень плохой)
1) Breakосновной DF на несколько фреймов данных, помещенных в массив:
val securityIds = allocStage1DF.select("ALLOCONEGROUP").distinct.collect.flatMap(_.toSeq)
val bySecurityArray = securityIds.map(securityIds => allocStage1DF.where($"ALLOCONEGROUP" <=> securityIds))
2) Цикл по фрейму данных и переход к методу для обработки, строка за строкой сверху фрейма данных:
df.coalesce(1).sort($"PRIORITY" asc).collect().foreach({
row => AllocOneOutput.allocOneOutput(row)}
)
Мы ищем комбинацию параллельной и последовательной обработки.
Параллельная обработка на уровне группы. потому что все они являются независимыми группами и могут быть распараллелены.
В каждой группе строки должны обрабатываться одна за другой в последовательности, что очень важно для нашего варианта использования.
пример данных
Применить группирование по SECURITY_ID, CC, BU, MPU, что дает нам 2 группы сверху (SECID_1, CC_A, BU_A,MPU_A и SECID_2, CC_A, BU_A, MPU_A).
с помощью матрицы приоритетов (ничего, кроме таблицы ссылок для присвоения ранга строкам), мы переносим каждую группу в ниже:
Транспонированные данные
Каждая строка в вышеуказанной группе имеет приоритет и сортируется в указанном порядке. Теперь я хочу обрабатывать каждую строку одну за другой, передавая их функции и получая вывод, как показано ниже:
output
Подробное объяснение сценария использования:
- Базовый фрейм данных содержит данные о всех торговых позициях финансовой фирмы. некоторые клиенты покупают (долго) данный финансовый продукт (уникально идентифицируемый по securityId), а некоторые продают (коротко) его.
- Идея нашего приложения состоит в том, чтобы идентифицировать / связать длинные позиции и короткие позиции в заданном идентификаторе безопасности.
- Поскольку такое сопряжение происходит с идентификатором безопасности, мы сказали, что базовый фрейм данныхразделены на группы на основе этого идентификатора безопасности, и каждая группа может обрабатываться независимо.
- Почему мы ищем последовательную обработку внутри группы? Это происходит потому, что, когда в данной группе имеется много длинных позиций и много коротких позиций (как в примере данных), справочная таблица (матрица приоритетов) решает, с какой длинной позицией следует сопоставить короткую позицию. в основном, это дает порядок обработки.
- Вторая причина заключается в том, что когда заданное длинное количество и короткое количество не равны, то остаточное количество подходит для спаривания. то есть, если длинное количество осталось, то оно может быть соединено со следующим коротким количеством, доступным в группе согласно приоритету или наоборот.
- По причинам, упомянутым в 4 и 5, мы ищемобрабатывать строку за строкой в группе.
Выше приведены точки, описанные ниже с использованием набора данных.
Base DataFrame
+-------------+----------+----------+------
ROW_NO|SECURITY_ID| ACCOUNT|POSITION|
+-------------+----------+----------+------
1| secId| Acc1| +100|
2| secId| Acc2| -150|
3| secId| Acc3| -25|
4| secId2| Acc3| -25|
5| secId2| Acc3| -25|
Базовый кадр данных разделенна основе группы по идентификатору безопасности. Давайте воспользуемся второй группой, как показано ниже
+-------------+----------+----------+------
ROW_NO|SECURITY_ID| ACCOUNT|POSITION|
+-------------+----------+----------+------
1| secId| Acc1| +100|
2| secId| Acc2| -150|
3| secId| Acc3| -25|
. В вышеприведенном случае положительная позиция 100 может быть в паре с -50 или -25. Чтобы разорвать связь, следующая таблица ссылок, называемая приоритетной матрицей, помогает определить порядок.
+-------------+----------+----------+------
+vePositionAccount|-vePositionAccount| RANK
+-------------+----------+----------+------
Acc1| Acc3| 1|
Acc1| Acc2| 2|
, поэтому из вышеприведенной матрицы мы знаем, что сначала будут соединены строки № 1 и 3, а затем строки № 1 и2. Это тот порядок (последовательная обработка), о котором мы говорим. Позволяет соединить их сейчас, как показано ниже:
+-------------+----------+----------+------+-------------+----------+----------+------
+veROW_NO|+veSECURITY_ID| +veACCOUNT|+vePOSITION|+veROW_NO|+veSECURITY_ID| +veACCOUNT|+vePOSITION|
+-------------+----------+----------+------+-------------+----------+----------+------
1| secId| Acc1| +100| 3| secId| Acc3| -25|
1| secId| Acc1| +100| 2| secId| Acc2| -150|
Что происходит, когда строка 1 обрабатывается после строки 2? (это то, что нам нужно)
1.После обработки строки 1 - Позиция в Acc1 будет (100 - 25) = 75 Позиция в Acc3 будет 0. Обновленная позиция в Acc1, которая равна 75, теперь будет использоваться при обработке второй строки.
2.После обработки строки 2 - Позиция в Acc1 будет 0. Положение в Acc2 будет (75-150) -75.
Результирующий фрейм данных:
+-------------+----------+----------+------
ROW_NO|SECURITY_ID| ACCOUNT|POSITION|
+-------------+----------+----------+------
1| secId| Acc1| 0|
2| secId| Acc2| -75|
3| secId| Acc3| 0|
Что происходит, когда строка 2 обрабатывается после строки 1? (мы не хотим этого)
- После обработки строки 2 - Позиция в Acc1 будет 0 Позиция в Acc2 будет (100-150) -50. Обновленная позиция в Acc1, которая равна 0, теперь будет использоваться при обработке первого ряда.
- После обработки строки 1 - Позиция в Acc1 будет 0. Позиция в Acc3 останется неизменной при -25.
Результирующий фрейм данных:
+-------------+----------+----------+------
ROW_NO|SECURITY_ID| ACCOUNT|POSITION|
+-------------+----------+----------+------
1| secId| Acc1| 0|
2| secId| Acc2| -50|
3| secId| Acc3| -25|
Как вы видели выше, порядок обработки в группе определяет наш вывод
Я также хотел спросить - почему не искра поддерживает последовательнуюобработка в разделе данных кадра? мы говорим, что нам нужны возможности параллельной обработки кластера. Вот почему мы разделяем фрейм данных на группы и просим кластер применять логику к этим группам параллельно. Все, что мы говорим: если группа имеет, скажем, 100 строк, то пусть эти 100 строк обрабатываются 1 за другим в порядке. Разве это не поддерживается искрой?
Если это не так, то какие еще технологии в больших данных могут помочь достичь этого?
Альтернативная реализация:
- Разбейте информационный фрейм на столько разделов, сколько на количество групп (в нашем случае - 50000. Групп больше, чем строк в любой группе). не более нескольких сотен).
- Запустить действие 'ForeachPartition' для фрейма данных, где логика выполняется независимо между разделами.
- записать выходные данные обработки каждого раздела в кластер.
- После обработки всего фрейма данных отдельное задание будет читать эти отдельные файлы с шага 3 и записывать в один файл / фрейм данных.
Я сомневаюсь, еслиТысячи разделов - это хорошо, но все же хотелось бы знать, звучит ли подход хорошо.