Apache Spark + Java: «java .lang.AssertionError: сбой подтверждения» в ExpressionEncoder - PullRequest
0 голосов
/ 05 апреля 2020

Я пытаюсь вызвать groupByKey для набора данных, используя следующий код:

    SparkSession SPARK_SESSION = new SparkSession(new SparkContext("local", "app"));
    JavaSparkContext JAVA_SPARK_CONTEXT = new JavaSparkContext(SPARK_SESSION.sparkContext());

    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    class Chunk implements Serializable {
        private Integer id;
        private String letters;
    }

    class JavaAggregator extends Aggregator<Chunk, String, String> {

        @Override
        public String zero() {
            return "";
        }

        @Override
        public String reduce(String b, Chunk a) {
            return b + a.getLetters();
        }

        @Override
        public String merge(String b1, String b2) {
            return b1 + b2;
        }

        @Override
        public String finish(String reduction) {
            return reduction;
        }

        @Override
        public Encoder<String> bufferEncoder() {
            return Encoders.bean(String.class);
        }

        @Override
        public Encoder<String> outputEncoder() {
            return Encoders.bean(String.class);
        }
    }

    List<Chunk> chunkList = List.of(
            new Chunk(1, "a"), new Chunk(2, "1"), new Chunk(3, "-*-"),
            new Chunk(1, "b"), new Chunk(2, "2"), new Chunk(3, "-**-"),
            new Chunk(1, "c"), new Chunk(2, "3"), new Chunk(3, "-***-"));

    Dataset<Row> df = SPARK_SESSION.createDataFrame(JAVA_SPARK_CONTEXT.parallelize(chunkList), Chunk.class);
    Dataset<Chunk> ds = df.as(Encoders.bean(Chunk.class));

    KeyValueGroupedDataset<Integer, Chunk> grouped = ds.groupByKey((Function1<Chunk, Integer>)  v -> v.getId(), Encoders.bean(Integer.class));

Но я получаю исключение, которое говорит java .lang.AssertionError: утверждение не удалось

at scala.Predef$.assert(Predef.scala:208)
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.javaBean(ExpressionEncoder.scala:87)
at org.apache.spark.sql.Encoders$.bean(Encoders.scala:142)
at org.apache.spark.sql.Encoders.bean(Encoders.scala)

I Я не эксперт по scala внутренним компонентам, и мне трудно сказать, что не так с кодом, так как исключение выдается некоторым скомпилированным кодом, а сообщение с утверждением "утверждение не выполнено" не очень полезно. Но, может быть, есть что-то в корне неверное в том, что я сделал в своем коде, которое вызывает это исключение?

1 Ответ

0 голосов
/ 07 апреля 2020

Я обнаружил, что проблема была в использовании энкодеров. Я должен был использовать Encoders.INT () вместо Encoders.bean (Integer.class)

...