Как избежать необходимости вложенных вызовов в кадры данных Spark - которые не работают - PullRequest
0 голосов
/ 15 ноября 2018

Предположим, у меня есть фрейм данных Spark с именем trades, который имеет в своей схеме несколько столбцов, некоторые измерения (скажем, Product и Тип ) и некоторые факты (скажем, Цена и Объем ).

Строки в кадре данных, имеющие одинаковые столбцы измерений, логически относятся к одной и той же группе.

Мне нужно сопоставить каждый набор измерений (Product, Type) с числовым значением, чтобыполучить в конце фрейм данных stats, который имеет столько строк, сколько отдельного числа измерений и значение - это критическая часть, - которая получается из всех строк в trades этого (Product, Type) и котораянеобходимо вычислять последовательно по порядку, потому что функция, применяемая строка за строкой, не является ни ассоциативной, ни коммутативной, и ее нельзя распараллелить.

Мне удалось обработать последовательную функцию, которую мне нужно применить к каждому подмножеству, путем перераспределения в 1отдельный блок данных для каждого кадра данных и сортировки строк, чтобы получить именно то, что мне нужно.

Я борюсь с тем, как сделать карту с trades до stats как работу Spark: в моемМастер сценариев удален и может использовать несколько исполнителей, в то время как режим развертывания является локальным, а локальный компьютер плохо оборудованPED.Поэтому я не хочу делать зацикливание на драйвере, но перенести его в кластер.

Если бы это был не Spark, я бы сделал что-то вроде:

val dimensions = trades.select("Product", "Type").distinct()
val stats = dimensions.map( row =>
     val product = row.getAs[String]("Product")
     val type = row.getAs[String]("Type")
     val inScope = col("Product") === product and col("Type") === type
     val tradesInScope = trades.filter(inScope)
     Row(product, type, callSequentialFunction(tradesInScope))
)

Мне показалось, что это нормально, но совершенно не работает: я пытаюсь сделать вложенный вызов на trades, и кажется, что они не поддерживаются.В самом деле, при выполнении этой компиляции задания искры, но при фактическом выполнении действия я получаю NullPointerException, потому что фрейм данных trades равен нулю в пределах map

Я новичок в Spark, и я неНе знаю другого способа достижения того же намерения действительным способом.Не могли бы вы помочь мне?

1 Ответ

0 голосов
/ 15 ноября 2018

вы получаете NullpointerExecption, потому что вы не можете использовать кадры данных в коде на стороне исполнителя, они живут только в драйвере. Кроме того, ваш код не гарантирует, что callSequentialFunction будет вызываться последовательно, потому что map в кадре данных будет работать параллельно (если у вас более 1 раздела). Что вы можете сделать, это что-то вроде этого:

val dimensions = trades.select("Product", "Type").distinct().as[(String,String)].collect()

val stats = dimensions.map{case (product,type) => 
     val inScope = col("Product") === product and col("Type") === type
     val tradesInScope = trades.filter(inScope)
     (product, type, callSequentialFunction(tradesInScope))
}

Но обратите внимание, что порядок в dimensions несколько произвольный, поэтому вы должны отсортировать dimensions в соответствии с вашими потребностями

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...