Spark sql создать набор данных из строки - PullRequest
0 голосов
/ 23 марта 2020

У меня есть строка, из которой я хотел бы создать набор данных, строка \n отделена для строк и \t отделена для полей:
8 "SOMETHING" 15236236 "2" "SOMETHING" "SOMETHHING"
Поэтому я разделил строку на \n и создайте List<String> из него, затем я создаю JavaRDD с использованием экземпляра JavaSparkContext, затем я пытаюсь создать свой набор данных с помощью метода sqlContet createDataset.
Это прекрасно компилируется, и если я помещаю точка останова в операторе возврата метода loadDataset (), я вижу набор данных settingsDataset, он прерывается только после того, как код вызывает первое действие.

Я пытаюсь добиться этого следующим образом:

private Dataset<Row> loadDataset(){
    InputStream in;
    Dataset<Row> settingsDataset = null;
    try {
      JavaSparkContext jsc = new JavaSparkConte xt(session.sparkContext());
      in = getClass().getResourceAsStream("filename.tsv");
      String settingsFileAsString = IOUtils.toString(in, Charsets.UTF_8);
      List<String> settingsFileAsList = Arrays.asList(settingsFileAsString.split("\n"));
      Encoder<Row> encoder = RowEncoder.apply(getSchema());
      JavaRDD settingsFileAsRDD = jsc.parallelize(settingsFileAsList);
      settingsDataset = session.sqlContext().createDataset(settingsFileAsRDD.rdd(), encoder).toDF();
    } catch (Exception e) {
      e.printStackTrace();
    }
    return settingsDataset;
 }

  private org.apache.spark.sql.types.StructType getSchema() {
    return DataTypes.createStructType(new StructField[]{
        DataTypes.createStructField("f_1", DataTypes.StringType, true),
        DataTypes.createStructField("f_2", DataTypes.StringType, true),
        DataTypes.createStructField("f_3", DataTypes.StringType, true),
        DataTypes.createStructField("f_4", DataTypes.StringType, true),
        DataTypes.createStructField("f_5", DataTypes.StringType, true),
        DataTypes.createStructField("f_6", DataTypes.StringType, true)
    });
  }

Проблема в том, что DAG не может быть создан, код прерывается со следующим исключением: ! java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.spark.sql.Row

1 Ответ

1 голос
/ 23 марта 2020

В действительности JavaRDD settingsFileAsRDD = jsc.parallelize(settingsFileAsList); равно JavaRDD<String>, но должно быть JavaRDD<Row>. Вы должны разделить эти «линии» на \t и создать из них Row, используя RowFactory.create(s.split("\t")). См. Пример ниже:

SparkSession spark = SparkSession.builder().master("local").getOrCreate();
JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
String settingsFileAsString = "1\t2\t3\t4\t5\t6\n7\t8\t9\t10\t11\t12";
List<String> settingsFileAsList = Arrays.asList(settingsFileAsString.split("\n"));
Encoder<Row> encoder = RowEncoder.apply(getSchema());
JavaRDD<Row> settingsFileAsRDD = jsc.parallelize(settingsFileAsList).map(s->RowFactory.create(s.split("\t")));
Dataset<Row> settingsDataset = spark.createDataset(settingsFileAsRDD.rdd(), encoder).toDF();
settingsDataset.show();

результат:

+---+---+---+---+---+---+
|f_1|f_2|f_3|f_4|f_5|f_6|
+---+---+---+---+---+---+
|  1|  2|  3|  4|  5|  6|
|  7|  8|  9| 10| 11| 12|
+---+---+---+---+---+---+
...