Spark Scala - параллельная массовая обработка различных дочерних фреймов данных - PullRequest
1 голос
/ 30 мая 2019

Я работаю над проектом по обнаружению мошеннических транзакций, который использует искру и в основном использует основанный на правилах подход для оценки риска входящих транзакций.Для этого подхода, основанного на правилах, из исторических данных создается несколько карт для представления различных шаблонов в транзакциях, которые затем используются позже при оценке транзакции.В связи с быстрым увеличением размера данных, мы сейчас модифицируем код для создания этих карт на каждом уровне аккаунта.

более ранний код был, например, для

createProfile(clientdata)

, но теперь он становится

accountList.map(account=>createProfile(clientData.filter(s"""account=${account}""")))

Используя этот подход,профили генерируются, но поскольку эти операции выполняются последовательно, следовательно, это не представляется возможным.

Кроме того, сама функция createProfile использует кадры данных, следовательно, sparkContext / SparkSessions приводит к проблемене могу отправить эти задачи на рабочие узлы, так как, по моему мнению, только драйвер может получить доступ к фреймам данных и sparkSession / sparkContext.Следовательно, следующий код не работает

import sparkSession.implicit._ val accountListRdd=accountList.toSeq.toDF("accountNumber") accountList.rdd.map(accountrow=>createProfile(clientData.filter(s"""account=${accountrow.get(0).toString}""")))

Приведенный выше код не работает, но представляет логику для желаемого поведения вывода.

Другой подход, яРассматривается использование многопоточности на уровне драйвера с использованием scala Future. Но даже в этом сценарии много объектов jvm создаются в одном вызове функции createProfile, поэтому увеличение потоков, даже если этот подход работает, может привести кмного объектов jvm, что само по себе может привести к проблемам с сборкой мусора и нехваткой памяти.

просто для того, чтобы определить временную перспективу, createProfile занимает в среднем около 10 минут для одной учетной записи, и у нас есть 3000 учетных записей, поэтому последовательно потребуется многодней.С многопоточностью, даже если мы достигнем коэффициента 10, это займет много дней.Поэтому нам нужен параллелизм порядка 100-х.

Одна из вещей, которая могла бы сработать, если бы она существовала, была ... скажем, есть ли что-то вроде искры groupBy в операции типа groupBY, где на первом уровне мы можем группировать по "account", а затемвыполнять другие операции (в настоящее время проблема заключается в том, что UDF не сможет обрабатывать операции, которые мы хотим выполнить)

Еще одно практически возможное решение - это способ, которым работает SPark Streaming - в нем есть метод forEachRDD итакже параметр spark.streaming.concurrentjobs, который позволяет параллельно обрабатывать несколько СДР.Я не уверен, как это работает, но, возможно, такая реализация может помочь.

Выше приведено описание проблемы и мои текущие взгляды на нее.

Пожалуйста, дайте мне знать, если у кого-нибудь есть какие-либо идеи относительноэтот!Также я предпочту логическое изменение, а не предложение другой технологии

...