Карта Spark Scala не работает с пользовательской функцией - PullRequest
0 голосов
/ 10 октября 2019

Я пытаюсь запустить пользовательскую функцию параллельно для некоторых параметров, хранящихся в RDD в spark scala. Я предполагаю, что использование карты должно дать результат. Я получаю сообщение об ошибке при передаче пользовательской функции, которую я определил. Нет ошибки, если я передам какую-то стандартную функцию (например, 'length'). Тем не менее, это не похоже на проблему с конкретной моей функцией, потому что, даже если я передаю пустую пользовательскую функцию, она все равно не работает. Я был бы признателен за любое предложение.

Если быть точным, то df_short - это фрейм данных с 6 столбцами и 1 строкой. :

df_short.rdd.map(i => i.length).collect

Следующее получение пользовательской функции - функция, которая возвращает только 0

def grid_search_2(prm1: Int, prm2: Double, prm3: Double, prm4: Double, 
prm5: Double, prm6: Double): Int = {
return 0
}

Теперь попытка передать пользовательскую функцию приводит к ошибке:

df_short.rdd.map(i => grid_search_2(i.getInt(0), i.getDouble(1), 
i.getDouble(2), i.getDouble(3), i.getDouble(4), i.getDouble(5))).collect

Я получаю следующую ошибку:

va.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:430)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:83)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
.
.
.
.

Ошибка очень длинная, и я могу вставить больше. Я был бы признателен за любую помощь в выяснении, почему происходит эта ошибка - до сих пор мне не очень повезло с поиском решения. Я использую версию 2.4.3, версию scala 2.11.12 и пишу код в блокноте Zeppelin. Спасибо!

1 Ответ

0 голосов
/ 10 октября 2019

Возможно, потому что вы используете метод экземпляра в своем удаленном коде (а не когда вы используете .map(i => i.length), который является анонимной функцией).

Я просто догадываюсь: когда вы используете ваш экземплярный метод (который принадлежит «классу», который не виден непосредственно из Zeppelin-Notebook), Spark пытается сериализовать весь класс. Вместо этого вы можете попытаться определить объект функции:

val grid_search_2 = (prm1: Int, prm2: Double, prm3: Double, prm4: Double, prm5: Double, prm6: Double) =>  {0}

и посмотреть, имеет ли это какое-то значение

...