Spark - Как сделать карту сериализуемой - PullRequest
0 голосов
/ 19 сентября 2018

Мне нужно извлечь и преобразовать из большого набора данных некоторую информацию, которая впоследствии будет использоваться другим набором данных.

Поскольку используемая информация всегда одинакова и ее можно хранить в пареПо умолчанию, я собирался сохранить эту информацию в виде карты, которая будет использоваться udf, поэтому я избегаю нескольких обращений к большому набору данных.

Проблема в том, что я получаю следующееошибка:

org.apache.spark.SparkException: Task not serializable

Есть ли способ сделать сериализуемую карту?

Если это невозможно, есть ли другой способ сохранить информацию в объекте просмотра в Spark?

Вот мой код:

val cityTimeZone: scala.collection.immutable.Map[String,Double]  = Map("CEB" -> 8.0, "LGW" -> 0.0, "CPT" -> 2.0
, "MUC" -> 1.0, "SGN" -> 7.0, "BNE" -> 10.0, "DME" -> 3.0, "FJR" -> 4.0, "BAH" -> 3.0, "ARN" -> 1.0, "FCO" -> 1.0, "DUS" -> 1.0, "MRU" -> 4.0, "JFK" -> -5.0, "GLA" -> 0.0)

def getLocalHour = udf ((city:String, timeutc:Int) => {
    val timeOffset = cityTimeZone(city)
    val localtime = if((timeutc+timeOffset)%24 >= 0)(timeutc+timeOffset)%24 else ((timeutc+timeOffset)%24)*(-1)
    localtime
})

//$"dateutc" is a timestamp column like this: 2017-03-01 03:45:00 and $"city" a 3 letters code in capitals, like those in the map above

val newDF = DF
.select("dateutc","city")
.withColumn("utchour", hour($"dateutc"))
.withColumn("localhour", getLocalHour($"city", $"utchour"))

display(newDF)

1 Ответ

0 голосов
/ 19 сентября 2018

Объявление переменной-члена

val cityTimeZone  

в сочетании с

cityTimeZone(city)

внутри udf проблематично, поскольку последнее является просто ярлыком для

this.cityTimeZone(city)

где this - это (предположительно) какой-то огромный несериализуемый объект (возможно, потому что он содержит ссылку на несериализуемый контекст искры).

Создайте getLocalHour a lazy val и переместитекарта, которая нужна udf внутри определения getLocalHour как локальной переменной, что-то вроде этого:

lazy val getLocalHour = {
  val cityTimeZone: Map[String, Double] = Map("CEB" -> 8.0, "LGW" -> 0.0)
  udf ((city:String, timeutc:Int) => {
    val timeOffset = cityTimeZone(city)
    val localtime = if((timeutc+timeOffset)%24 >= 0)(timeutc+timeOffset)%24 else ((timeutc+timeOffset)%24)*(-1)
    localtime
  })
}

В качестве альтернативы, присоедините cityTimeZone к некоторому сериализуемому объект (т. е. некоторый объект, который не содержит ссылок на какие-либо потоки, сокеты, контексты искры и все другие несериализуемые вещи; например, объекты пакета с утилитарными методами и константами будут в порядке).

Если udf определение содержит ссылки на любые другие переменные-члены, относитесь к ним соответственно.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...