ClassCastException в простом приложении java spark + drools - PullRequest
0 голосов
/ 10 октября 2018

Я создаю метод UDF с помощью spark и drools, когда я запускаю java-сторону с eclipse, это происходит без проблем, но когда я делаю это с слюнами, это выдает мне ошибку приведения:

java.lang.ClassCastException: невозможно присвоить экземпляр scala.collection.immutable.List $ SerializationProxy полю org.apache.spark.sql.execution.columnar.InMemoryTableScanExec.predicates типа scala.collection.Seq в случае org.apache.spark.sql.execution.columnar.InMemoryTableScanExec в java.io.ObjectStreamClass $ FieldReflector.setObjFieldValues ​​(ObjectStreamClass.java:2287) в java.io.ObjectStreamClass.setObjFieldValues ​​(ObjectStreamClass.Office.Javajava: 2293) в java.io.ObjectInputStream.readSerialData (ObjectInputStream.java:2211) в java.io.ObjectInputStream.readOrdinaryObject (ObjectInputStream.java:2069) в java.io.ObjectInputStream.readObject0 (ObjectInputSt).java.io.ObjectInputStream.defaultReadFields (OBJEctInputStream.java:2287) в java.io.ObjectInputStream.readSerialData (ObjectInputStream.java:2211) в java.io.ObjectInputStream.readOrdinaryObject (ObjectInputStream.java:2069) в java.io.ObjectInputStream.java15ject) в java.io.ObjectInputStream.defaultReadFields (ObjectInputStream.java:2287) в java.io.ObjectInputStream.readSerialData (ObjectInputStream.java:2211) в java.io.ObjectInputStream.readOrdinaryObject (ObjectInputStio.69) jj.ObjectInputStream.readObject0 (ObjectInputStream.java:1573) при java.io.ObjectInputStream.defaultReadFields (ObjectInputStream.java:2287) при java.io.ObjectInputStream.readSerialData (ObjectInputStream.java:2211) в java.io.ObjectInputStream.readOrdinaryObject (ObjectInputStream.java:2069) по адресу java.io.ObjectInputStream.readObject0 (ObjectInputStream.java:1573) по адресу java.io.ObjectInputStream.defaultReadFields (ObjectInputStream.java:2287) по адресу java.io.ObjectInputStream.jj.) на java.io.ObjectInputStream.readOrdinaryObject (ObjectInputStream.java:2069) в java.io.ObjectInputStream.readObject0 (ObjectInputStream.java:1573) в java.io.ObjectInputStream.defaultReadFields (ObjectInputStream.java:2287Stb.Inava.ject.ject.ject.read.ject.read.ject.read.ject.ject.read.ject.ject.ject.read.ject.ject.read.read.ject.ject.ject.read.read.ject.ject.read.ject.read.ject.ject.read.read.ject.ject.read.ject.ject.read.ject.ject.read.ject.ject.read.In...java: 2211) в java.io.ObjectInputStream.readOrdinaryObject (ObjectInputStream.java:2069) в java.io.ObjectInputStream.readObject0 (ObjectInputStream.java:1573) в java.io.ObjectInputStream.readObject (ObjectInput431).в scala.collection.immutable.List $ SerializationProxy.readObject (List.scala: 479) в sun.reflect.GeneratedMethodAccessor11.invoke (неизвестный источник) в sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAavalang):.reflect.Method.invoke (Method.java:498) в java.io.ObjectStreamClass.invokeReadObject (ObjectStreamClass.java:1170) в java.io.ObjectInputStream.readSerialData (ObjectInputStream.java:2178) в java.put.Ob.readOrdinaryObject (ObjectInputStream.java: 2069) в java.io.ObjectInputStream.readObject0 (ObjectInputStream.java:1573) в java.io.ObjectInputStream.defaultReadFields (ObjectInputStream.java:2287) в java.io.ObjectInputStream.readSerialData (ObjectInputSt.jpg)java.io.ObjectInputStream.readOrdinaryObject (ObjectInputStream.java:2069) в java.io.ObjectInputStream.readObject0 (ObjectInputStream.java:1573) в java.io.ObjectInputStream.defaultReadFields (ObjectInputStream.Oava.java:jav:22).readSerialData (ObjectInputStream.java:2211) в java.io.ObjectInputStream.readOrdinaryObject (ObjectInputStream.java:2069) в java.io.ObjectInputStream.readObject0 (ObjectInputStream.java:1573)в java.io.ObjectInputStream.readObject (ObjectInputStream.java:431) в scala.collection.immutable.List $ SerializationProxy.readObject (List.scala: 479) в sun.reflect.GeneratedMethodAccessor11.invoke (неизвестный источник) в sun.ref.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43) при java.lang.reflect.Method.invoke (Method.java:498) в java.io.ObjectStreamClass.invokeReadObject (ObjectStreamClass.java:1170) в java.io.ObjectInputStream.readSerialData (ObjectInputStream.java:2178) в java.io.ObjectInputStream.readOrdinaryObject (ObjectInputStream.java:2069) в java.io.ObjectInputStream.readObject0 (ObjectInputStream.java:1573) в java.io.OreamjidInput: 2287) в java.io.ObjectInputStream.readSerialData (ObjectInputStream.java:2211) в java.io.ObjectInputStream.readOrdinaryObject (ObjectInputStream.java:2069) в java.io.ObjectInputStream.readObject0 (ObjectInputStream.j).io.ObjectInputStream.defaultReadFields (ObjectInputStream.java:2287) в java.io.ObjectInputStream.readSerialData (ObjectInputStream.java:2211) в java.io.ObjectInputStream.readOrdinaryObject (ObjectInputStream.java:2069) в java.io.ObjectInputStream.readObject0) в java.io.ObjectInputStream.readObject (ObjectInputStream.java:431) в scala.collection.immutable.List $ SerializationProxy.readObject (List.scala: 479)


слюни правило

function Dataset categories(Dataset ds) {

StructType newSchema = new StructType()
      .add("CR", DataTypes.StringType, false)
      .add("CP", DataTypes.StringType, false);

UDF3<Integer, String, String, GenericRowWithSchema> categoryUDF = new UDF3<Integer, String, String, GenericRowWithSchema>() {

       public GenericRowWithSchema call(Integer r, String t, String q) throws Exception {
         return new GenericRowWithSchema(new String[]{"NULL", "NULL"}, newSchema);
            }
        };
 ds.sparkSession().udf().register("categoryUDF", categoryUDF, newSchema);

    return ds.withColumn("categorie",
           functions.callUDF("categoryUDF",
               functions.col("rlv_reason"),
               functions.col("rlv_type"),
               functions.col("qualification")))
       .select(ds.col("*"), functions.col("categorie.*"));
    }

rule "test UDF"
ruleflow-group "data-enrichment"
no-loop true
when
    $ds : Dataset();
    $data : Dataset() from categories($ds);
    eval (((Dataset)$data.filter("CP == 'NULL' or CR == 'NULL'")).count() > 0)
then
    System.out.println("test UDF work" );
    $data.show();
end

** Исполнение: **

   StatelessKieSession kieSession = DroolsHelper.startStatelessKieSession();
    List<Command> cmdList = Arrays.asList(
         CommandFactory.newInsert(dataset),               
         CommandFactory.newStartProcess(processId)
    );
    kieSession.execute(CommandFactory.newBatchExecution(cmdList));

Зависимости:

enter image description here

...