Вызов UDF для Dataframe с проблемой сериализации - PullRequest
1 голос
/ 10 марта 2019

Я просматривал некоторые примеры в блогах UDF, которые, кажется, работают, но на самом деле, когда я запускаю их, они дают печально известную задачу, а не сериализуемую ошибку.

Мне кажется странным, что это опубликовано, и неттакое упоминание сделано.Запуск Spark 2.4.

Код, довольно просто, должно быть, что-то изменилось в Spark?: https://medium.com/@mrpowers/spark-user-defined-functions-udfs-6c849e39443b

Что-то, должно быть, изменилось ???

Я посмотрел на верхний элемент голосования с здесь объектом и расширяет Serializable, но тоже не радует.Озадаченный.

РЕДАКТИРОВАТЬ

Кажется, что все изменилось, этот формат необходим:

val squared = udf((s: Long) => s * s)  

Объектный подход все еще интересует меня, почему он потерпел неудачу.

Ответы [ 2 ]

1 голос
/ 10 марта 2019

Я не мог воспроизвести ошибку (пробовал на свечах 1.6, 2.3 и 2.4), но я помню, что сталкивался с такого рода ошибками (давно).Я выскажу свое лучшее предположение.

Проблема возникает из-за разницы между методом и функцией в scala. Как подробно описано здесь .

Короткая версия этого, когда вы пишете def, эквивалентна методам в Java, то есть является частью класса a, и может быть вызвана с использованием экземпляракласс.

Когда вы пишете udf((s: Long) => s * s), он создает экземпляр черты Function1.Для этого генерируется анонимный класс, реализующий Function1, метод применения которого похож на def apply(s: Long):Long= {s * s}, и экземпляр этого класса передается в качестве параметра udf.

Однако, когда вы пишете udf[String, String](lowerRemoveAllWhitespace) метод lowerRemoveAllWhitespace необходимо преобразовать в Function1 экземпляр и передать в udf.Здесь происходит сбой сериализации, поскольку метод apply в этом экземпляре будет пытаться вызвать lowerRemoveAllWhitespace для экземпляра другого объекта (который не может быть сериализован и отправлен в рабочий процесс jvm), вызывая исключение.

0 голосов
/ 10 марта 2019

Пример, который был опубликован, был из авторитетного источника, но я не могу запустить без ошибки сериализации в Spark 2.4, попытка объектов и т. Д. Тоже не помогли.

Я решил проблему следующим образомиспользование подхода udf ((.., который выглядит как единственное возможное утверждение, и действительно, я мог это сделать и вуаля не сериализовался. Несколько иной пример, хотя и с использованием примитивов.

val sumContributionsPlus  = udf((n1: Int, n2: Int, n3: Int, n4: Int) => Seq(n1,n2,n3,n4).foldLeft(0)( (acc, a) => if (a > 0) acc + a else acc))

В заключение отметим, чтообсуждение UDF, родной Spark, UDF-столбцов сбивает с толку, когда вещи перестают работать.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...