Я создаю метод UDF с помощью spark и drools, когда я запускаю java-сторону с eclipse, это происходит без проблем, но когда я делаю это с слюнами, это выдает мне ошибку приведения:
java.lang.ClassCastException: невозможно присвоить экземпляр scala.collection.immutable.List $ SerializationProxy полю org.apache.spark.sql.execution.columnar.InMemoryTableScanExec.predicates типа scala.collection.Seq в случае org.apache.spark.sql.execution.columnar.InMemoryTableScanExec в java.io.ObjectStreamClass $ FieldReflector.setObjFieldValues (ObjectStreamClass.java:2287) в java.io.ObjectStreamClass.setObjFieldValues (ObjectStreamClass.Office.Javajava: 2293) в java.io.ObjectInputStream.readSerialData (ObjectInputStream.java:2211) в java.io.ObjectInputStream.readOrdinaryObject (ObjectInputStream.java:2069) в java.io.ObjectInputStream.readObject0 (ObjectInputSt).java.io.ObjectInputStream.defaultReadFields (OBJEctInputStream.java:2287) в java.io.ObjectInputStream.readSerialData (ObjectInputStream.java:2211) в java.io.ObjectInputStream.readOrdinaryObject (ObjectInputStream.java:2069) в java.io.ObjectInputStream.java15ject) в java.io.ObjectInputStream.defaultReadFields (ObjectInputStream.java:2287) в java.io.ObjectInputStream.readSerialData (ObjectInputStream.java:2211) в java.io.ObjectInputStream.readOrdinaryObject (ObjectInputStio.69) jj.ObjectInputStream.readObject0 (ObjectInputStream.java:1573) при java.io.ObjectInputStream.defaultReadFields (ObjectInputStream.java:2287) при java.io.ObjectInputStream.readSerialData (ObjectInputStream.java:2211) в java.io.ObjectInputStream.readOrdinaryObject (ObjectInputStream.java:2069) по адресу java.io.ObjectInputStream.readObject0 (ObjectInputStream.java:1573) по адресу java.io.ObjectInputStream.defaultReadFields (ObjectInputStream.java:2287) по адресу java.io.ObjectInputStream.jj.) на java.io.ObjectInputStream.readOrdinaryObject (ObjectInputStream.java:2069) в java.io.ObjectInputStream.readObject0 (ObjectInputStream.java:1573) в java.io.ObjectInputStream.defaultReadFields (ObjectInputStream.java:2287Stb.Inava.ject.ject.ject.read.ject.read.ject.read.ject.ject.read.ject.ject.ject.read.ject.ject.read.read.ject.ject.ject.read.read.ject.ject.read.ject.read.ject.ject.read.read.ject.ject.read.ject.ject.read.ject.ject.read.ject.ject.read.In...java: 2211) в java.io.ObjectInputStream.readOrdinaryObject (ObjectInputStream.java:2069) в java.io.ObjectInputStream.readObject0 (ObjectInputStream.java:1573) в java.io.ObjectInputStream.readObject (ObjectInput431).в scala.collection.immutable.List $ SerializationProxy.readObject (List.scala: 479) в sun.reflect.GeneratedMethodAccessor11.invoke (неизвестный источник) в sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAavalang):.reflect.Method.invoke (Method.java:498) в java.io.ObjectStreamClass.invokeReadObject (ObjectStreamClass.java:1170) в java.io.ObjectInputStream.readSerialData (ObjectInputStream.java:2178) в java.put.Ob.readOrdinaryObject (ObjectInputStream.java: 2069) в java.io.ObjectInputStream.readObject0 (ObjectInputStream.java:1573) в java.io.ObjectInputStream.defaultReadFields (ObjectInputStream.java:2287) в java.io.ObjectInputStream.readSerialData (ObjectInputSt.jpg)java.io.ObjectInputStream.readOrdinaryObject (ObjectInputStream.java:2069) в java.io.ObjectInputStream.readObject0 (ObjectInputStream.java:1573) в java.io.ObjectInputStream.defaultReadFields (ObjectInputStream.Oava.java:jav:22).readSerialData (ObjectInputStream.java:2211) в java.io.ObjectInputStream.readOrdinaryObject (ObjectInputStream.java:2069) в java.io.ObjectInputStream.readObject0 (ObjectInputStream.java:1573)в java.io.ObjectInputStream.readObject (ObjectInputStream.java:431) в scala.collection.immutable.List $ SerializationProxy.readObject (List.scala: 479) в sun.reflect.GeneratedMethodAccessor11.invoke (неизвестный источник) в sun.ref.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43) при java.lang.reflect.Method.invoke (Method.java:498) в java.io.ObjectStreamClass.invokeReadObject (ObjectStreamClass.java:1170) в java.io.ObjectInputStream.readSerialData (ObjectInputStream.java:2178) в java.io.ObjectInputStream.readOrdinaryObject (ObjectInputStream.java:2069) в java.io.ObjectInputStream.readObject0 (ObjectInputStream.java:1573) в java.io.OreamjidInput: 2287) в java.io.ObjectInputStream.readSerialData (ObjectInputStream.java:2211) в java.io.ObjectInputStream.readOrdinaryObject (ObjectInputStream.java:2069) в java.io.ObjectInputStream.readObject0 (ObjectInputStream.j).io.ObjectInputStream.defaultReadFields (ObjectInputStream.java:2287) в java.io.ObjectInputStream.readSerialData (ObjectInputStream.java:2211) в java.io.ObjectInputStream.readOrdinaryObject (ObjectInputStream.java:2069) в java.io.ObjectInputStream.readObject0) в java.io.ObjectInputStream.readObject (ObjectInputStream.java:431) в scala.collection.immutable.List $ SerializationProxy.readObject (List.scala: 479)
слюни правило
function Dataset categories(Dataset ds) {
StructType newSchema = new StructType()
.add("CR", DataTypes.StringType, false)
.add("CP", DataTypes.StringType, false);
UDF3<Integer, String, String, GenericRowWithSchema> categoryUDF = new UDF3<Integer, String, String, GenericRowWithSchema>() {
public GenericRowWithSchema call(Integer r, String t, String q) throws Exception {
return new GenericRowWithSchema(new String[]{"NULL", "NULL"}, newSchema);
}
};
ds.sparkSession().udf().register("categoryUDF", categoryUDF, newSchema);
return ds.withColumn("categorie",
functions.callUDF("categoryUDF",
functions.col("rlv_reason"),
functions.col("rlv_type"),
functions.col("qualification")))
.select(ds.col("*"), functions.col("categorie.*"));
}
rule "test UDF"
ruleflow-group "data-enrichment"
no-loop true
when
$ds : Dataset();
$data : Dataset() from categories($ds);
eval (((Dataset)$data.filter("CP == 'NULL' or CR == 'NULL'")).count() > 0)
then
System.out.println("test UDF work" );
$data.show();
end
** Исполнение: **
StatelessKieSession kieSession = DroolsHelper.startStatelessKieSession();
List<Command> cmdList = Arrays.asList(
CommandFactory.newInsert(dataset),
CommandFactory.newStartProcess(processId)
);
kieSession.execute(CommandFactory.newBatchExecution(cmdList));
Зависимости: