Я пытаюсь вызвать groupByKey для набора данных, используя следующий код:
SparkSession SPARK_SESSION = new SparkSession(new SparkContext("local", "app"));
JavaSparkContext JAVA_SPARK_CONTEXT = new JavaSparkContext(SPARK_SESSION.sparkContext());
@Data
@NoArgsConstructor
@AllArgsConstructor
class Chunk implements Serializable {
private Integer id;
private String letters;
}
class JavaAggregator extends Aggregator<Chunk, String, String> {
@Override
public String zero() {
return "";
}
@Override
public String reduce(String b, Chunk a) {
return b + a.getLetters();
}
@Override
public String merge(String b1, String b2) {
return b1 + b2;
}
@Override
public String finish(String reduction) {
return reduction;
}
@Override
public Encoder<String> bufferEncoder() {
return Encoders.bean(String.class);
}
@Override
public Encoder<String> outputEncoder() {
return Encoders.bean(String.class);
}
}
List<Chunk> chunkList = List.of(
new Chunk(1, "a"), new Chunk(2, "1"), new Chunk(3, "-*-"),
new Chunk(1, "b"), new Chunk(2, "2"), new Chunk(3, "-**-"),
new Chunk(1, "c"), new Chunk(2, "3"), new Chunk(3, "-***-"));
Dataset<Row> df = SPARK_SESSION.createDataFrame(JAVA_SPARK_CONTEXT.parallelize(chunkList), Chunk.class);
Dataset<Chunk> ds = df.as(Encoders.bean(Chunk.class));
KeyValueGroupedDataset<Integer, Chunk> grouped = ds.groupByKey((Function1<Chunk, Integer>) v -> v.getId(), Encoders.bean(Integer.class));
Но я получаю исключение, которое говорит java .lang.AssertionError: утверждение не удалось
at scala.Predef$.assert(Predef.scala:208)
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.javaBean(ExpressionEncoder.scala:87)
at org.apache.spark.sql.Encoders$.bean(Encoders.scala:142)
at org.apache.spark.sql.Encoders.bean(Encoders.scala)
I Я не эксперт по scala внутренним компонентам, и мне трудно сказать, что не так с кодом, так как исключение выдается некоторым скомпилированным кодом, а сообщение с утверждением "утверждение не выполнено" не очень полезно. Но, может быть, есть что-то в корне неверное в том, что я сделал в своем коде, которое вызывает это исключение?