RuntimeException при преобразовании набора данных <Row>в JavaRDD <Row>и затем в Dataframe - PullRequest
0 голосов
/ 18 декабря 2018

Я пытаюсь добавить столбец индекса в набор данных, используя приведенный ниже код, который преобразует его в JavaPairRDD.

// ds is a Dataset<Row>
JavaPairRDD<Row, Long> indexedRDD = ds.toJavaRDD()
    .zipWithIndex();

// Now I am converting JavaPairRDD to JavaRDD as below.
JavaRDD<Row> rowRDD = indexedRDD
    .map(tuple -> RowFactory.create(tuple._1(),tuple._2().intValue()));

// I am converting the RDD back to dataframe and it doesnt work.
Dataset<Row> authDf = session
    .createDataFrame(rowRDD, ds.schema().add("ID", DataTypes.IntegerType));

// Below is the ds schema(Before adding the ID column).
ds.schema()

root
 |-- user: short (nullable = true)
 |-- score: long (nullable = true)
 |-- programType: string (nullable = true)
 |-- source: string (nullable = true)
 |-- item: string (nullable = true)
 |-- playType: string (nullable = true)
 |-- userf: integer (nullable = true)

Приведенный выше код выдает следующее сообщение об ошибке:

**Job aborted due to stage failure: Task 0 in stage 21.0 failed 4 
times, most  recent failure: Lost task 0.3 in stage 21.0 (TID 658, 
sl73caehdn0406.visa.com, executor 1):

java.lang.RuntimeException: 
Error while encoding: java.lang.RuntimeException: 
org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema is not 
a valid external type for schema of smallint**

1 Ответ

0 голосов
/ 22 июня 2019

Кортеж, который вы создали во втором выражении, состоит из двух столбцов: один - это объект (который состоит из всех столбцов из исходного набора данных), а второй - целочисленный.Второй столбец кортежа переходит во второй столбец результата, который имеет тип long.Первый столбец кортежа переходит в первый столбец результата, который имеет тип short - будучи объектом, то есть GenericRowWithSchema, это приводит к ошибке.

Вы должны сделать RowFactory.create () с 7 параметрами, по одному для каждогостолбец результата.

...