Я пытаюсь добавить фрейм данных одного столбца в больший фрейм данных, однако проблема с первым фреймом данных заключается в том, что после его создания и попытки добавить его в основной фрейм данных с помощью команды:
df.withColumn("name", dataframe)
Я получаю сообщение об ошибке:
**found : org.apache.spark.sql.DataFrame
(which expands to) org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]
required: org.apache.spark.sql.Column**
Я понимаю, что набор данных [строка] должен быть синонимичен с фреймом данных, однако я не уверен, как обойти эту ошибку.
Для контекста (действительно) разбавленная версия моего кода приведена ниже:
// test function - will be used as part of the main script below
def Test(inputone: Double, inputtwo: Double): Double = {
var test = (2 * inputone) + inputtwo
test
}
Для основного сценария (т. Е. Где проблема)
//Importing the data via CSV
var df = sqlContext.read.format("csv").option("header", "true").option("inferSchema", "true").load("/root/file.csv")
Чтобы дать контексто том, как выглядят данные:
df: org.apache.spark.sql.DataFrame = [ID: int, blue: int ... 8 more fields]
+---+----+------+-----+------+------+----+---+-----+-----+
| ID|blue|purple|green|yellow|orange|pink|red|white|black|
+---+----+------+-----+------+------+----+---+-----+-----+
| 1| 500| 44| 0| 0| 3| 0| 5| 43| 2|
| 2| 560| 33| 1| 0| 4| 0| 22| 33| 4|
| 3| 744| 44| 1| 99| 3|1000| 78| 90| 0|
+---+----+------+-----+------+------+----+---+-----+-----+
root
|-- ID: integer (nullable = true)
|-- blue: integer (nullable = true)
|-- purple: integer (nullable = true)
|-- green: integer (nullable = true)
|-- yellow: integer (nullable = true)
|-- orange: integer (nullable = true)
|-- pink: integer (nullable = true)
|-- red: integer (nullable = true)
|-- white: integer (nullable = true)
|-- black: integer (nullable = true)
С этого момента сценарий продолжается
// Creating a list for which columns to draw from the main dataframe
val a = List("green", "blue")
// Creating the mini dataframe to perform the function upon
val test_df = df.select(a.map(col): _*)
// The new dataframe will now go through the 'Test' function defined above
val df_function = test_df.rdd.map(col => Test(col(0).toString.toDouble, col(1).toString.toDouble))
// Converting the RDD output back to a dataframe (of one column)
val df_convert = df_function.toDF
Для справки вывод выглядит следующим образом
+-----+
|value|
+-----+
|500.0|
|562.0|
|746.0|
+-----+
Последняя строка скрипта должна добавить его в основной фрейм данных следующим образом
df = df.withColumn("new column", df_convert)
Но, как указано выше, я получаю следующую ошибку:
found : org.apache.spark.sql.DataFrame
(which expands to) org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]
required: org.apache.spark.sql.Column
////////// РЕДАКТИРОВАТЬ ////////////
@ user9819212 решение работает для упрощенных методов bно при вызове одного более сложного я получаю следующую ошибку
test2_udf: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function5>,DoubleType,Some(List(DoubleType, IntegerType, StringType, DoubleType, DoubleType)))
java.lang.ClassCastException: $anonfun$1 cannot be cast to scala.Function1
Поэтому я попытался создать еще одну упрощенную версию своего кода с несколькими дополнительными изменениями в тестовой функции, которая называется
// test function - will be used as part of the main script below
def Test (valueone: Double, valuetwo: Integer): Double = {
val test = if(valuetwo > 2000) valueone + 4000 else valueone
val fakeList = List(3000,4000,500000000)
val index = fakeList.indexWhere(x => x>=valueone)
val test2 = fakeList(index - 1) * valueone
test2
}
val test_udf = udf(Test _)
df = df.withColumn(
"new column",
test_udf(col("green").cast("double"), col("blue").cast("integer"))
)
Сначала кажется, что это работает, но когда я пытаюсь просмотреть кадр данных с помощью команды
df.show
, я получаю следующую ошибку
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 153.0 failed 1 times, most recent failure: Lost task 0.0 in stage 153.0 (TID 192, localhost, executor driver):
org.apache.spark.SparkException: Failed to execute user defined function($anonfun$1: (double, int) => double)