Я пытаюсь запустить пользовательскую функцию параллельно для некоторых параметров, хранящихся в RDD в spark scala. Я предполагаю, что использование карты должно дать результат. Я получаю сообщение об ошибке при передаче пользовательской функции, которую я определил. Нет ошибки, если я передам какую-то стандартную функцию (например, 'length'). Тем не менее, это не похоже на проблему с конкретной моей функцией, потому что, даже если я передаю пустую пользовательскую функцию, она все равно не работает. Я был бы признателен за любое предложение.
Если быть точным, то df_short - это фрейм данных с 6 столбцами и 1 строкой. :
df_short.rdd.map(i => i.length).collect
Следующее получение пользовательской функции - функция, которая возвращает только 0
def grid_search_2(prm1: Int, prm2: Double, prm3: Double, prm4: Double,
prm5: Double, prm6: Double): Int = {
return 0
}
Теперь попытка передать пользовательскую функцию приводит к ошибке:
df_short.rdd.map(i => grid_search_2(i.getInt(0), i.getDouble(1),
i.getDouble(2), i.getDouble(3), i.getDouble(4), i.getDouble(5))).collect
Я получаю следующую ошибку:
va.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:430)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:83)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
.
.
.
.
Ошибка очень длинная, и я могу вставить больше. Я был бы признателен за любую помощь в выяснении, почему происходит эта ошибка - до сих пор мне не очень повезло с поиском решения. Я использую версию 2.4.3, версию scala 2.11.12 и пишу код в блокноте Zeppelin. Спасибо!