Выполните поиск на широковещательной карте с условием значения столбца в Spark, используя Scala - PullRequest
4 голосов
/ 11 июня 2019

Я хочу выполнить поиск на myMap. Когда значение col2 равно «0000», я хочу обновить его значением, связанным с ключом col1. В противном случае я хочу сохранить существующее значение col2.

val myDF :

+-----+-----+
|col1 |col2 |
+-----+-----+
|1    |a    | 
|2    |0000 |
|3    |c    |
|4    |0000 |
+-----+-----+

val myMap : Map[String, String] ("2" -> "b", "4" -> "d")
val broadcastMyMap = spark.sparkContext.broadcast(myMap)

def lookup = udf((key:String) => broadcastMyMap.value.get(key))

myDF.withColumn("col2", when ($"col2" === "0000", lookup($"col1")).otherwise($"col2"))

Я использовал приведенный выше код в spark-shell, и он отлично работает, но когда я собираю jar-файл приложения и отправляю его в Spark с помощью spark-submit, выдается ошибка:

org.apache.spark.SparkException: Failed to execute user defined  function(anonfun$5: (string) => string)

Caused by: java.lang.NullPointerException

Есть ли способ выполнить поиск без использования UDF, который не является лучшим вариантом с точки зрения производительности или для исправления ошибки? Я думаю, что не могу просто использовать соединение, потому что некоторые значения myDF.col2, которые нужно сохранить, могут быть заменены в операции.

1 Ответ

1 голос
/ 11 июня 2019

ваш NullPointerException НЕ действителен. Я подтвердил пример программы, как показано ниже.
его прекрасно работающий штраф. Вы выполняете следующую программу.

package com.example

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.expressions.UserDefinedFunction


object MapLookupDF {
  Logger.getLogger("org").setLevel(Level.OFF)

  def main(args: Array[String]) {
    import org.apache.spark.sql.functions._

    val spark = SparkSession.builder.
      master("local[*]")
      .appName("MapLookupDF")
      .getOrCreate()
    import spark.implicits._
    val mydf = Seq((1, "a"), (2, "0000"), (3, "c"), (4, "0000")).toDF("col1", "col2")
    mydf.show
    val myMap: Map[String, String] = Map("2" -> "b", "4" -> "d")
    println(myMap.toString)
    val broadcastMyMap = spark.sparkContext.broadcast(myMap)

    def lookup: UserDefinedFunction = udf((key: String) => {
      println("getting the value for the key " + key)
      broadcastMyMap.value.get(key)
    }
    )

    val finaldf = mydf.withColumn("col2", when($"col2" === "0000", lookup($"col1")).otherwise($"col2"))
    finaldf.show
  }
}

Результат:

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
+----+----+
|col1|col2|
+----+----+
|   1|   a|
|   2|0000|
|   3|   c|
|   4|0000|
+----+----+

Map(2 -> b, 4 -> d)
getting the value for the key 2
getting the value for the key 4
+----+----+
|col1|col2|
+----+----+
|   1|   a|
|   2|   b|
|   3|   c|
|   4|   d|
+----+----+

примечание: не будет существенной деградации для небольшой карты, транслируемой.

если вы хотите перейти с фреймом данных, вы можете перейти как конвертировать карту в фрейм данных

val df = myMap.toSeq.toDF("key", "val")

Map(2 -> b, 4 -> d) in dataframe format will be like
+----+----+
|key|val  |
+----+----+
|   2|   b|
|   4|   d|
+----+----+

, а затем присоединиться как this

DIY ...

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...