Я работаю над проектом по обнаружению мошеннических транзакций, который использует искру и в основном использует основанный на правилах подход для оценки риска входящих транзакций.Для этого подхода, основанного на правилах, из исторических данных создается несколько карт для представления различных шаблонов в транзакциях, которые затем используются позже при оценке транзакции.В связи с быстрым увеличением размера данных, мы сейчас модифицируем код для создания этих карт на каждом уровне аккаунта.
более ранний код был, например, для
createProfile(clientdata)
, но теперь он становится
accountList.map(account=>createProfile(clientData.filter(s"""account=${account}""")))
Используя этот подход,профили генерируются, но поскольку эти операции выполняются последовательно, следовательно, это не представляется возможным.
Кроме того, сама функция createProfile использует кадры данных, следовательно, sparkContext / SparkSessions приводит к проблемене могу отправить эти задачи на рабочие узлы, так как, по моему мнению, только драйвер может получить доступ к фреймам данных и sparkSession / sparkContext.Следовательно, следующий код не работает
import sparkSession.implicit._
val accountListRdd=accountList.toSeq.toDF("accountNumber")
accountList.rdd.map(accountrow=>createProfile(clientData.filter(s"""account=${accountrow.get(0).toString}""")))
Приведенный выше код не работает, но представляет логику для желаемого поведения вывода.
Другой подход, яРассматривается использование многопоточности на уровне драйвера с использованием scala Future. Но даже в этом сценарии много объектов jvm создаются в одном вызове функции createProfile
, поэтому увеличение потоков, даже если этот подход работает, может привести кмного объектов jvm, что само по себе может привести к проблемам с сборкой мусора и нехваткой памяти.
просто для того, чтобы определить временную перспективу, createProfile занимает в среднем около 10 минут для одной учетной записи, и у нас есть 3000 учетных записей, поэтому последовательно потребуется многодней.С многопоточностью, даже если мы достигнем коэффициента 10, это займет много дней.Поэтому нам нужен параллелизм порядка 100-х.
Одна из вещей, которая могла бы сработать, если бы она существовала, была ... скажем, есть ли что-то вроде искры groupBy в операции типа groupBY, где на первом уровне мы можем группировать по "account", а затемвыполнять другие операции (в настоящее время проблема заключается в том, что UDF не сможет обрабатывать операции, которые мы хотим выполнить)
Еще одно практически возможное решение - это способ, которым работает SPark Streaming - в нем есть метод forEachRDD
итакже параметр spark.streaming.concurrentjobs
, который позволяет параллельно обрабатывать несколько СДР.Я не уверен, как это работает, но, возможно, такая реализация может помочь.
Выше приведено описание проблемы и мои текущие взгляды на нее.
Пожалуйста, дайте мне знать, если у кого-нибудь есть какие-либо идеи относительноэтот!Также я предпочту логическое изменение, а не предложение другой технологии