Сбой десериализации большого KiePackage с java.io.OptionalDataException для большого количества правил.
Я пытаюсь запустить механизм правил Drools в сочетании с Apache Beam, работающим на Spark.Один из многих шаблонов правил (ниже) не может быть десериализован для большого количества правил.
Хотя я понимаю, что это конкретное правило можно решить с помощью Join, я стремлюсь понять причину этой проблемы
Буду очень признателен за любые рекомендации.
template header
id
field1
field2
Length
package rules;
dialect "mvel"
import data.SomeObject;
template "Rule1"
rule "Rule1 @{id}"
when
$ob : SomeObject (
prop1.subProp1.field1 == @{field1},
prop1.subProp1.field2 == "@{field2}",
prop1.subProp2!.prop2!.length == @{Length}
)
then
// do something
end
end template
public class KiePackageCoder extends CustomCoder<KiePackage> {
@Override
public void encode(KiePackage value, OutputStream outStream) throws CoderException, IOException {
DroolsStreamUtils.streamOut(outStream, value);
}
@Override
public KiePackage decode(InputStream inStream) throws CoderException, IOException {
try {
return (KiePackage) DroolsStreamUtils.streamIn(inStream, getClass().getClassLoader());
} catch (ClassNotFoundException e) {
throw new CoderException(e);
}
}
}
java.lang.RuntimeException: java.lang.IllegalStateException: Error decoding bytes for coder: WindowedValue$FullWindowedValueCoder(...KiePackageCoder@cd8a5fa,GlobalWindow$Coder)
at org.apache.beam.runners.spark.translation.TranslationUtils.lambda$null$2(TranslationUtils.java:296)
at org.apache.beam.runners.spark.repackaged.com.google.common.collect.Iterators$7.transform(Iterators.java:750)
at org.apache.beam.runners.spark.repackaged.com.google.common.collect.TransformedIterator.next(TransformedIterator.java:47)
at org.apache.beam.runners.spark.repackaged.com.google.common.collect.TransformedIterator.next(TransformedIterator.java:47)
at org.apache.beam.runners.spark.repackaged.com.google.common.collect.TransformedIterator.next(TransformedIterator.java:47)
at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:43)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:30)
at org.apache.beam.runners.spark.translation.SparkProcessContext.processPartition(SparkProcessContext.java:64)
at org.apache.beam.runners.spark.translation.MultiDoFnFunction.call(MultiDoFnFunction.java:155)
at org.apache.beam.runners.spark.translation.MultiDoFnFunction.call(MultiDoFnFunction.java:58)
at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.apply(JavaRDDLike.scala:186)
at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.apply(JavaRDDLike.scala:186)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:800)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:800)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: Error decoding bytes for coder: WindowedValue$FullWindowedValueCoder(...KiePackageCoder@cd8a5fa,GlobalWindow$Coder)
at org.apache.beam.runners.spark.coders.CoderHelpers.fromByteArray(CoderHelpers.java:91)
at org.apache.beam.runners.spark.coders.CoderHelpers.lambda$null$1(CoderHelpers.java:181)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at java.util.Iterator.forEachRemaining(Iterator.java:116)
at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
at org.apache.beam.runners.spark.coders.CoderHelpers.lambda$fromByteFunctionIterable$c79d5cae$1(CoderHelpers.java:182)
at org.apache.beam.runners.spark.translation.TranslationUtils.lambda$null$2(TranslationUtils.java:294)
... 33 more
Caused by: java.io.OptionalDataException
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1592)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:430)
at java.util.HashSet.readObject(HashSet.java:341)
at sun.reflect.GeneratedMethodAccessor35.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1170)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2177)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:430)
at org.drools.core.rule.MVELDialectRuntimeData.readExternal(MVELDialectRuntimeData.java:101)
at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:2117)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2066)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:430)
at java.util.HashMap.readObject(HashMap.java:1409)
at sun.reflect.GeneratedMethodAccessor25.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1170)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2177)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:430)
at org.drools.core.rule.DialectRuntimeRegistry.readExternal(DialectRuntimeRegistry.java:60)
at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:2117)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2066)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:430)
at org.drools.core.definitions.impl.KnowledgePackageImpl.readExternal(KnowledgePackageImpl.java:299)
at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:2117)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2066)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:430)
at org.drools.core.util.DroolsStreamUtils.streamIn(DroolsStreamUtils.java:205)
at org.drools.core.util.DroolsStreamUtils.streamIn(DroolsStreamUtils.java:189)
at ... KiePackageCoder.decode(KiePackageCoder.java:21)
at ... KiePackageCoder.decode(KiePackageCoder.java:12)
at org.apache.beam.sdk.coders.Coder.decode(Coder.java:170)
at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543)
at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:480)
at org.apache.beam.runners.spark.coders.CoderHelpers.fromByteArray(CoderHelpers.java:89)
... 44 more