Scala UDF возвращает «Схема для типа Unit не поддерживается» - PullRequest
0 голосов
/ 02 июля 2018

Я хочу внести изменения в столбец в кадре данных. Столбец представляет собой массив для целых чисел. Я хочу заменить элементы массива, взяв индекс из другого массива и заменив этот элемент элементом из третьего массива. Пример: у меня есть три столбца C1, C2, C3 все три массива. Я хочу заменить элементы в C3 следующим образом.

C3[C2[i]] = C1[i].

Я написал следующий UDF:

def UpdateHist = udf((CRF_count: Seq[Long], Day: Seq[String], History: Seq[Int])=> for(i <- 0 to Day.length-1){History.updated(Day(i).toInt-1 , CRF_count(i).toInt)})

и выполнил это:

histdate3.withColumn("History2", UpdateHist2(col("CRF_count"), col("Day"), col("History"))).show()

Но возвращается ошибка, как показано ниже:

scala> histdate3.withColumn("History2", UpdateHist2(col("CRF_count"), col("Day"), col("History"))).show()

java.lang.UnsupportedOperationException: схема для типа Unit не поддерживается в org.apache.spark.sql.catalyst.ScalaReflection $ .schemaFor (ScalaReflection.scala: 733) в org.apache.spark.sql.catalyst.ScalaReflection $ .schemaFor (ScalaReflection.scala: 671) в org.apache.spark.sql.functions $ .udf (functions.scala: 3100) на UpdateHist2 (: 25) ... 48 исключено

Я думаю, что возвращаю какой-то другой тип, тип View, который не поддерживается. Пожалуйста, помогите мне, как я могу решить эту проблему.

1 Ответ

0 голосов
/ 02 июля 2018

Ваш цикл for возвращает Unit, следовательно, сообщение об ошибке. Вы можете использовать for-yield для возврата значений, но поскольку Seq должно быть updated последовательно, простой foldLeft будет работать лучше:

import org.apache.spark.sql.functions._

val df = Seq(
  (Seq(101L, 102L), Seq("1", "2"), Seq(11, 12)),
  (Seq(201L, 202L, 203L), Seq("2", "3"), Seq(21, 22, 23))
).toDF("C1", "C2", "C3")
// +---------------+------+------------+
// |C1             |C2    |C3          |
// +---------------+------+------------+
// |[101, 102]     |[1, 2]|[11, 12]    |
// |[201, 202, 203]|[2, 3]|[21, 22, 23]|
// +---------------+------+------------+

def updateC3 = udf( (c1: Seq[Long], c2: Seq[String], c3: Seq[Int]) =>
  c2.foldLeft( c3 ){ (acc, i) =>
    val idx = i.toInt - 1
    acc.updated(idx, c1(idx).toInt)
  }
)

df.withColumn("C3", updateC3($"C1", $"C2", $"C3")).show(false)
// +---------------+------+--------------+
// |C1             |C2    |C3            |
// +---------------+------+--------------+
// |[101, 102]     |[1, 2]|[101, 102]    |
// |[201, 202, 203]|[2, 3]|[21, 202, 203]|
// +---------------+------+--------------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...