Spark SQL Java GenericRowWithSchema не может быть приведен к java.lang.String - PullRequest
0 голосов
/ 15 ноября 2018

У меня есть приложение, которое пытается прочитать группу csv из директории кластера и записать их в виде файла паркета с помощью Spark.

SparkSession sparkSession = createSession();
    JavaRDD<Row> entityRDD = sparkSession.read()
            .csv(dataCluster + "measures/measures-*.csv")
            .javaRDD()
            .mapPartitionsWithIndex(removeHeader, false)
            .map((Function<String, Measure>) s -> {
                String[] parts = s.split(COMMA);
                Measure measure = new Measure();
                measure.setCobDate(parts[0]);
                measure.setDatabaseId(Integer.valueOf(parts[1]));
                measure.setName(parts[2]);

                return measure;
            });

    Dataset<Row> entityDataFrame = sparkSession.createDataFrame(entityRDD, Measure.class);
    entityDataFrame.printSchema();

    //Create parquet file here
    String parquetDir = dataCluster + "measures/parquet/measures";
    entityDataFrame.write().mode(SaveMode.Overwrite).parquet(parquetDir);


    sparkSession.stop();

Класс Measure - это простой POJO, который реализует Serializable.Схема напечатана, поэтому при преобразовании записей DataFrame в файл паркета может возникнуть проблема.Вот ошибка, которую я получаю:

Lost task 2.0 in stage 1.0 (TID 3, redlxd00006.nomura.com, executor 1): org.apache.spark.SparkException: Task failed while writing rows
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:204)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(FileFormatWriter.scala:129)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(FileFormatWriter.scala:128)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:99)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
    Caused by: java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to java.lang.String
        at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1040)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:244)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:190)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:188)
        at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1341)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:193)
        ... 8 more

В конечном итоге я собираюсь использовать Spark SQL для фильтрации и объединения данных с другими CSV-файлами, содержащими другие данные таблиц, и записывать все результаты в паркет.Я нашел только вопросы, связанные со скалами, которые не решили мою проблему.Любая помощь очень ценится.

CSV:

cob_date, database_id, name
20181115,56459865,name1
20181115,56652865,name6
20181115,56459845,name32
20181115,15645936,name3

Ответы [ 2 ]

0 голосов
/ 15 ноября 2018

Добавление toDF () и обновление лямбды карты, как предложено Сержем, решило мою проблему:

SparkSession sparkSession = createSession();
JavaRDD<Row> entityRDD = sparkSession.read()
     .csv(prismDataCluster + "measures/measures-*chop.csv")
     .toDF("cobDate","databaseId","name")
     .javaRDD()
     .mapPartitionsWithIndex(removeHeader, false)
     .map((Function<Row, Measure>) row -> {
              Measure measure = new Measure();
              measure.setCobDate(row.getString(row.fieldIndex("cobDate")));
              measure.setDatabaseId(row.getString(row.fieldIndex("databaseId")));
              measure.setName(row.getString(row.fieldIndex("name")));

TVM.

0 голосов
/ 15 ноября 2018
.map((Function<String, Measure>) s -> {

Похоже, здесь должно быть

.map((Function<Row, Measure>) s -> {
...