Задача не сериализуема: Spark - PullRequest
1 голос
/ 29 апреля 2020

Моя задание зажигания генерирует задачу, не сериализуемую во время выполнения. Может кто-нибудь сказать мне, если я здесь что-то не так делаю?

@Component("loader")
    @Slf4j
public class LoaderSpark implements SparkJob {
    private static final int MAX_VERSIONS = 1;

    private final AppProperties props;


    public LoaderSpark(
            final AppProperties props
    ) {
        this.props = props;
    }


    @Override
    public void run(SparkSession spark, final String... args) throws IOException {

        HBaseUtil hBaseUtil = new HBaseUtil(props);

        byte[][] prefixes = new byte[][]{toBytes("document"),
                toBytes("dataSource"),
                toBytes("hold:")};

        Filter filter = new MultipleColumnPrefixFilter(prefixes);

        Scan scan = new Scan();
        scan.addFamily(toBytes("data"));
        scan.setCaching(100000);
        scan.setMaxVersions(MAX_VERSIONS);
        scan.setFilter(filter);


        JavaRDD<TestMethod> mapFileJavaRDD
                = hBaseUtil.createScanRdd(spark, "TestTable", scan).mapPartitions(tuple -> {

            return StreamUtils.asStream(tuple)
                    .map(this::extractResult)
                    .filter(Objects::nonNull)
                    .iterator();

        });


        Dataset<TestMethod> testDataset = spark.createDataset(mapFileJavaRDD.rdd(), bean(TestMethod.class));
        testDataset.limit(100);

    }

    private TestMethod extractResult(Tuple2<ImmutableBytesWritable, Result> resultTuple) {


        TestMethod.TestMethodBuilder testBuilder = TestMethod.builder();
        Result result;
        result = resultTuple._2();
        CdfParser cdfParser = new CdfParser();

        List<String> holdingId = new ArrayList<>();

        testBuilder.dataSource(Bytes.toString(result.getValue(Bytes.toBytes("data"),
                Bytes.toBytes("dataSource"))));
        testBuilder.document(cdfParser.fromXml(result.getValue(Bytes.toBytes("data"),
                Bytes.toBytes("document"))));

        NavigableMap<byte[], byte[]> familyMap = result.getFamilyMap(Bytes.toBytes("data"));
        for (byte[] bQunitifer : familyMap.keySet()) {

            if (Bytes.toString(bQunitifer).contains("hold:")) {

                LOG.info(Bytes.toString(bQunitifer));
                holdingId.add(Bytes.toString(bQunitifer));

            }
        }
        testBuilder.holding(holdingId);

        return testBuilder.build();
    }

}

ЗДЕСЬ есть трассировка стека:

  2020-04-29 12:48:59,837 INFO  [Thread-4]o.a.s.d.y.ApplicationMaster: Unregistering ApplicationMaster with FAILED (diag message: User class threw exception: java.lang.IllegalStateException: Failed to execute CommandLineRunner
        at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:787)
        at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:768)
        at org.springframework.boot.SpringApplication.run(SpringApplication.java:322)
        at org.oclc.googlelinks.spark.SpringSparkJob.main(SpringSparkJob.java:56)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:694)
    Caused by: org.apache.spark.SparkException: Task not serializable
        at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403)
        at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:393)
        at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
        at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:798)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:797)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
        at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:797)
        at org.apache.spark.api.java.JavaRDDLike$class.mapPartitions(JavaRDDLike.scala:155)
        at org.apache.spark.api.java.AbstractJavaRDDLike.mapPartitions(JavaRDDLike.scala:45)
        at org.oclc.googlelinks.spark.job.LoaderSpark.run(LoaderSpark.java:79)
        at org.oclc.googlelinks.spark.SpringSparkJob.run(SpringSparkJob.java:79)
        at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:784)
        ... 8 more

Ответы [ 2 ]

0 голосов
/ 30 апреля 2020

Просто сделайте функцию extractResult stati c. Чтобы вызвать метод stati c, вам не нужно сериализовать класс, вам нужно, чтобы декларирующий класс был доступен для загрузчика классов (и это так, поскольку архивы jar могут быть общими для драйвера и работников. ).

Благодаря https://www.nicolaferraro.me/2016/02/22/using-non-serializable-objects-in-apache-spark/

0 голосов
/ 29 апреля 2020

попробуйте добавить геттер и сеттеры для props

public void setProps(AppProperties props) {
        this.props = props;
    }

    public AppProperties getProps() {
        return props;
    }
...