Моя задание зажигания генерирует задачу, не сериализуемую во время выполнения. Может кто-нибудь сказать мне, если я здесь что-то не так делаю?
@Component("loader")
@Slf4j
public class LoaderSpark implements SparkJob {
private static final int MAX_VERSIONS = 1;
private final AppProperties props;
public LoaderSpark(
final AppProperties props
) {
this.props = props;
}
@Override
public void run(SparkSession spark, final String... args) throws IOException {
HBaseUtil hBaseUtil = new HBaseUtil(props);
byte[][] prefixes = new byte[][]{toBytes("document"),
toBytes("dataSource"),
toBytes("hold:")};
Filter filter = new MultipleColumnPrefixFilter(prefixes);
Scan scan = new Scan();
scan.addFamily(toBytes("data"));
scan.setCaching(100000);
scan.setMaxVersions(MAX_VERSIONS);
scan.setFilter(filter);
JavaRDD<TestMethod> mapFileJavaRDD
= hBaseUtil.createScanRdd(spark, "TestTable", scan).mapPartitions(tuple -> {
return StreamUtils.asStream(tuple)
.map(this::extractResult)
.filter(Objects::nonNull)
.iterator();
});
Dataset<TestMethod> testDataset = spark.createDataset(mapFileJavaRDD.rdd(), bean(TestMethod.class));
testDataset.limit(100);
}
private TestMethod extractResult(Tuple2<ImmutableBytesWritable, Result> resultTuple) {
TestMethod.TestMethodBuilder testBuilder = TestMethod.builder();
Result result;
result = resultTuple._2();
CdfParser cdfParser = new CdfParser();
List<String> holdingId = new ArrayList<>();
testBuilder.dataSource(Bytes.toString(result.getValue(Bytes.toBytes("data"),
Bytes.toBytes("dataSource"))));
testBuilder.document(cdfParser.fromXml(result.getValue(Bytes.toBytes("data"),
Bytes.toBytes("document"))));
NavigableMap<byte[], byte[]> familyMap = result.getFamilyMap(Bytes.toBytes("data"));
for (byte[] bQunitifer : familyMap.keySet()) {
if (Bytes.toString(bQunitifer).contains("hold:")) {
LOG.info(Bytes.toString(bQunitifer));
holdingId.add(Bytes.toString(bQunitifer));
}
}
testBuilder.holding(holdingId);
return testBuilder.build();
}
}
ЗДЕСЬ есть трассировка стека:
2020-04-29 12:48:59,837 INFO [Thread-4]o.a.s.d.y.ApplicationMaster: Unregistering ApplicationMaster with FAILED (diag message: User class threw exception: java.lang.IllegalStateException: Failed to execute CommandLineRunner
at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:787)
at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:768)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:322)
at org.oclc.googlelinks.spark.SpringSparkJob.main(SpringSparkJob.java:56)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:694)
Caused by: org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:393)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:798)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:797)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:797)
at org.apache.spark.api.java.JavaRDDLike$class.mapPartitions(JavaRDDLike.scala:155)
at org.apache.spark.api.java.AbstractJavaRDDLike.mapPartitions(JavaRDDLike.scala:45)
at org.oclc.googlelinks.spark.job.LoaderSpark.run(LoaderSpark.java:79)
at org.oclc.googlelinks.spark.SpringSparkJob.run(SpringSparkJob.java:79)
at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:784)
... 8 more