Искра UDF - классная приемная с JSON - PullRequest
0 голосов
/ 29 апреля 2019

Я использую java-искровой код, считывающий некоторые данные json и преобразующий одно из полей в верхний регистр через UDF Код работает нормально при работе в локальном режиме, но при работе в кластере (под kubernetes) я получаю ClassCastException, как это:

UDF1 uppercase = new UdfUppercase() ; 
session.udf().register("uppercasefunction",uppercase , DataTypes.StringType) ; 

StructField[] structFields = new StructField[]{ 
        new StructField("intColumn", DataTypes.IntegerType, true, Metadata.empty()), 
        new StructField("stringColumn", DataTypes.StringType, true, Metadata.empty()) 
}; 
StructType structType = new StructType(structFields); 

List<String> jsonData = ImmutableList.of( 
        "{\"intColumn\":1,\"stringColumn\":\"Miami\"}"); 

Dataset<String> anotherPeopleDataset = session.createDataset(jsonData, Encoders.STRING()); 
Dataset<Row> anotherPeople = session.read().schema(structType).json(anotherPeopleDataset);            
anotherPeople.show(false); 

Dataset<Row> dfupercase = anotherPeople.select(callUDF("uppercasefunction", col("stringColumn"))); 
dfupercase.show(false); 
Caused by: java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD
    at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2287)
    at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1417)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2293)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:490)
    at sun.reflect.GeneratedMethodAccessor9.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1170)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2178)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
    at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
    at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:83)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Любая помощь будет оценена

1 Ответ

0 голосов
/ 02 мая 2019

Проблема решена, теперь она была связана с некоторым конфликтом jar с весенней загрузкой. Classcastexception вводит в заблуждение, потому что у нас складывается впечатление, что каким-то образом наш массив данных плохо сериализуется (разница между локальным и кластерным), в то время как на самом деле он не имеет ничего общегос кодом istelf но баночка конфликтует

...