столкнулся с проблемой в Spark структурированной потоковой передачи - PullRequest
0 голосов
/ 12 ноября 2019

Я написал код для чтения файла csf и распечатал его на консоли с помощью Spark Stuctured Stream. Код ниже -


    import java.util.ArrayList;
    import java.util.List;

    import org.apache.spark.api.java.function.FlatMapFunction;
    import org.apache.spark.sql.*;
    import org.apache.spark.sql.streaming.StreamingQuery;
    import org.apache.spark.sql.Encoders;
    import org.apache.spark.sql.types.StructType;
    import com.cybernetix.models.BaseDataModel;

    public class ReadCSVJob   {

        static List<BaseDataModel>  bdmList=new ArrayList<BaseDataModel>();

        public static void main(String args[]) {

             SparkSession spark = SparkSession
                      .builder()
                      .config("spark.eventLog.enabled", "false")
                      .config("spark.driver.memory", "2g")
                      .config("spark.executor.memory", "2g")
                      .appName("StructuredStreamingAverage")
                      .master("local")
                      .getOrCreate();



            StructType userSchema = new StructType();
            userSchema.add("name", "string");
            userSchema.add("status", "String");
            userSchema.add("u_startDate", "String");
            userSchema.add("u_lastlogin", "string");
            userSchema.add("u_firstName", "string");
            userSchema.add("u_lastName", "string");
            userSchema.add("u_phone","string");
            userSchema.add("u_email", "string")
                    ;

            Dataset<Row> dataset = spark.
                    readStream().
                    schema(userSchema)
                    .csv("D:\\user\\sdata\\user-2019-10-03_20.csv");


            dataset.writeStream()
            .format("console")
            .option("truncate","false")
            .start();


        }

    }

в этой строке кода userSchema.add ("name", "string"); , вызывающее завершение работы программы. Ниже приведена трассировка журнала.

ANTLR Tool version 4.7 used for code generation does not match the current runtime version 4.5.3ANTLR Runtime version 4.7 used for parser compilation does not match the current runtime version 4.5.3Exception in thread "main" java.lang.ExceptionInInitializerError  at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:84)   at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parseDataType(ParseDriver.scala:39)   at org.apache.spark.sql.types.StructType.add(StructType.scala:213)  at com.cybernetix.sparks.jobs.ReadCSVJob.main(ReadCSVJob.java:45) Caused by: java.lang.UnsupportedOperationException: java.io.InvalidClassException: org.antlr.v4.runtime.atn.ATN; Could not deserialize ATN with UUID 59627784-3be5-417a-b9eb-8131a7286089 (expected aadb8d7e-aeef-4415-ad2b-8204d6cf042e or a legacy UUID).   at org.antlr.v4.runtime.atn.ATNDeserializer.deserialize(ATNDeserializer.java:153)   at org.apache.spark.sql.catalyst.parser.SqlBaseLexer.<clinit>(SqlBaseLexer.java:1175)   ... 4 more Caused by: java.io.InvalidClassException: org.antlr.v4.runtime.atn.ATN; Could not deserialize ATN with UUID 59627784-3be5-417a-b9eb-8131a7286089 (expected aadb8d7e-aeef-4415-ad2b-8204d6cf042e or a legacy UUID).   ... 6 more

Я добавил ANTLR maven зависимость в файл pom.xml, но все еще сталкиваюсь с той же проблемой.

<!-- https://mvnrepository.com/artifact/org.antlr/antlr4 -->
<dependency>
    <groupId>org.antlr</groupId>
    <artifactId>antlr4</artifactId>
    <version>4.7</version>
</dependency>

Я не уверен после добавления зависимости antlrпочему в maven списке зависимостей еще это antlr-runtime-4.5.3.jar. Посмотрите на снимок экрана ниже.

enter image description here

Может кто-нибудь помочь мне, что я здесь делаю неправильно?

...