Предположим, у меня есть фрейм данных Spark с именем trades
, который имеет в своей схеме несколько столбцов, некоторые измерения (скажем, Product и Тип ) и некоторые факты (скажем, Цена и Объем ).
Строки в кадре данных, имеющие одинаковые столбцы измерений, логически относятся к одной и той же группе.
Мне нужно сопоставить каждый набор измерений (Product, Type) с числовым значением, чтобыполучить в конце фрейм данных stats
, который имеет столько строк, сколько отдельного числа измерений и значение - это критическая часть, - которая получается из всех строк в trades
этого (Product, Type) и котораянеобходимо вычислять последовательно по порядку, потому что функция, применяемая строка за строкой, не является ни ассоциативной, ни коммутативной, и ее нельзя распараллелить.
Мне удалось обработать последовательную функцию, которую мне нужно применить к каждому подмножеству, путем перераспределения в 1отдельный блок данных для каждого кадра данных и сортировки строк, чтобы получить именно то, что мне нужно.
Я борюсь с тем, как сделать карту с trades
до stats
как работу Spark: в моемМастер сценариев удален и может использовать несколько исполнителей, в то время как режим развертывания является локальным, а локальный компьютер плохо оборудованPED.Поэтому я не хочу делать зацикливание на драйвере, но перенести его в кластер.
Если бы это был не Spark, я бы сделал что-то вроде:
val dimensions = trades.select("Product", "Type").distinct()
val stats = dimensions.map( row =>
val product = row.getAs[String]("Product")
val type = row.getAs[String]("Type")
val inScope = col("Product") === product and col("Type") === type
val tradesInScope = trades.filter(inScope)
Row(product, type, callSequentialFunction(tradesInScope))
)
Мне показалось, что это нормально, но совершенно не работает: я пытаюсь сделать вложенный вызов на trades
, и кажется, что они не поддерживаются.В самом деле, при выполнении этой компиляции задания искры, но при фактическом выполнении действия я получаю NullPointerException
, потому что фрейм данных trades
равен нулю в пределах map
Я новичок в Spark, и я неНе знаю другого способа достижения того же намерения действительным способом.Не могли бы вы помочь мне?