java.lang.StackOverflowError при выполнении потоковой передачи Spark - PullRequest
0 голосов
/ 25 марта 2019

Я делаю Spark Streaming для анализа некоторых сообщений kafka в режиме реального времени. Прежде чем разобрать сообщение, я читаю какой-то файл из локального файла и создаю две переменные GridMatrix GM и LinkMatcher LM, которые полезны для анализа. Вот код, который дает мне java.lang.StackOverflowError, когда я отправляю его, используя spark-submit xxx.jar:

public class Stream implements Serializable {
    GridMatrix GM = GridMatrixConstructor.init_Grid_Matrix(0.001);
    LinkMatcher LM = new LinkMatcher();

    public void parse_rdd_record(String[] fields) {
        try {
            System.out.println(InetAddress.getLocalHost().getHostName() + "---->" + Thread.currentThread());
        }
        catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println(LM.GF.toString());
        System.out.println(GM.topleft_x);
    }

    public void Streaming_process() throws Exception {
        SparkConf conf = new SparkConf()
                .setAppName("SparkStreaming")
                .setMaster("local[*]");
        conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
        conf.registerKryoClasses(new Class<?>[]{
                Class.forName("Streaming.Stream")
        });


        JavaSparkContext sc = new JavaSparkContext(conf);
        sc.setLogLevel("WARN");
        JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(2000));
        Map<String, Object> kafkaParams = new HashMap<>();
        kafkaParams.put("bootstrap.servers", "xxx.xx.xx.xx:20103,xxx.xx.xx.xx:20104,xxx.xx.xx.xx:20105");
        kafkaParams.put("key.deserializer", StringDeserializer.class);
        kafkaParams.put("value.deserializer", StringDeserializer.class);
        kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
        kafkaParams.put("auto.offset.reset", "latest");
        kafkaParams.put("enable.auto.commit", false);

        Collection<String> topics = Arrays.asList("nc_topic_gis_test");
        JavaInputDStream<ConsumerRecord<String, String>> GPS_DStream =
                KafkaUtils.createDirectStream(
                        ssc,
                        LocationStrategies.PreferConsistent(),
                        ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
                );

        JavaPairDStream<String, String> GPS_DStream_Pair =  GPS_DStream.mapToPair(
                (PairFunction<ConsumerRecord<String, String>, String, String>) record ->
                        new Tuple2<>("GPSValue", record.value()));

        GPS_DStream_Pair.foreachRDD(PairRDD -> PairRDD.foreach(rdd -> {
            String[] fields = rdd._2.split(",");
            this.parse_rdd_record(fields);
        }));

        ssc.start();
        ssc.awaitTermination();
    }

    public static void main(String[] args) throws Exception {
        new Stream().Streaming_process();
    }
}

Это дает мне следующую ошибку:

Exception in thread "streaming-job-executor-0" java.lang.StackOverflowError
        at java.io.Bits.putDouble(Bits.java:121)
        at java.io.ObjectStreamClass$FieldReflector.getPrimFieldValues(ObjectStreamClass.java:2168)
        at java.io.ObjectStreamClass.getPrimFieldValues(ObjectStreamClass.java:1389)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1533)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
        at java.util.HashMap.internalWriteEntries(HashMap.java:1790)
        at java.util.HashMap.writeObject(HashMap.java:1363)
        at sun.reflect.GeneratedMethodAccessor37.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1140)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
        at java.util.HashMap.internalWriteEntries(HashMap.java:1790)
        at java.util.HashMap.writeObject(HashMap.java:1363)
        at sun.reflect.GeneratedMethodAccessor37.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

Однако, если я заменим GM и LM на статическую переменную, все будет хорошо. Измените строку 2 и строку 3 на:

private static final GridMatrix GM = GridMatrixConstructor.init_Grid_Matrix(0.001);
private static final LinkMatcher LM = new LinkMatcher();

Может кто-нибудь сказать мне, почему он не будет работать без статических переменных?

1 Ответ

0 голосов
/ 25 марта 2019

Разница между статической и нестатической версией состоит в том, что, когда она не статична, она отправляет их всем рабочим как закрытие Stream , когда статическое значение не по умолчанию, за исключением того, что оно используется одним из потоковой лямбды, которая являетсяне случай.

При отправке рабочим пытается сериализовать объект.И происходит сбой согласно предоставленной трассировке стека.Причина, скорее всего, в том, что эти структуры имеют циклическое объявление внутри себя и не могут быть должным образом сериализуемы.

...