Подключение к блокам данных работает нормально, работа с DataFrames проходит гладко (такие операции, как объединение, фильтрация и т. Д. c). Проблема возникает, когда я вызываю cache
на фрейме данных.
py4j.protocol.Py4JJavaError: An error occurred while calling o342.cache.
: java.io.InvalidClassException: failed to read class descriptor
...
Caused by: java.lang.ClassNotFoundException: org.apache.spark.rdd.RDD$client53442a94a3$$anonfun$mapPartitions$1$$anonfun$apply$23
at java.lang.ClassLoader.findClass(ClassLoader.java:523)
at org.apache.spark.util.ParentClassLoader.findClass(ParentClassLoader.java:35)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at org.apache.spark.util.ParentClassLoader.loadClass(ParentClassLoader.java:40)
at org.apache.spark.util.ChildFirstURLClassLoader.loadClass(ChildFirstURLClassLoader.java:48)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.spark.util.Utils$.classForName(Utils.scala:257)
at org.apache.spark.sql.util.ProtoSerializer.org$apache$spark$sql$util$ProtoSerializer$$readResolveClassDescriptor(ProtoSerializer.scala:4316)
at org.apache.spark.sql.util.ProtoSerializer$$anon$4.readClassDescriptor(ProtoSerializer.scala:4304)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1857)
... 71 more
Я работаю с java8 по мере необходимости, очистка pycache не помогает. Тот же код, представленный в качестве задания для блоков данных, работает нормально. Это похоже на локальную проблему на уровне моста python -jvm, но java версии (8) и python (3.7) соответствуют требованиям. Переключение на java13 приводит к тому же сообщению.
Версии databricks-connect==6.2.0
, openjdk version "1.8.0_242"
, Python 3.7.6
РЕДАКТИРОВАТЬ: Поведение зависит от того, как создается DF, если источник DF является внешним тогда он работает нормально, если DF создается локально, то появляется такая ошибка.
# works fine
df = spark.read.csv("dbfs:/some.csv")
df.cache()
# ERROR in 'cache' line
df = spark.createDataFrame([("a",), ("b",)])
df.cache()