java.lang.RuntimeException: ошибка при кодировании: java.lang.ArrayIndexOutOfBoundsException: 1 - PullRequest
1 голос
/ 11 октября 2019

Я получил сообщение об ошибке, когда пытался объединить два набора данных из базы данных и CSV-файла, сообщение об ошибке выглядит так:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 14.0 failed 1 times, most recent failure: Lost task 0.0 in stage 14.0 (TID 14, localhost, executor driver): java.lang.RuntimeException: Error while encoding: java.lang.ArrayIndexOutOfBoundsException: 1
staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, targetString), StringType), true, false) AS targetString#205
staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 1, deviceName), StringType), true, false) AS deviceName#206
staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 2, alarmDetectionCode), StringType), true, false) AS alarmDetectionCode#207
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:292)
at org.apache.spark.sql.SparkSession$$anonfun$4.apply(SparkSession.scala:593)
at org.apache.spark.sql.SparkSession$$anonfun$4.apply(SparkSession.scala:593)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write

Выглядело как несовпадение, когда приложение spark соединяло два набора данныхс другой схемой, но я не знаю, как это случилось. Мой код Java, как это:

Dataset result = null;
result = deviceInfoDataset.join(searchInfo,deviceInfoDataset.col("deviceName").equalTo(searchInfo.col("deviceName")));
result.show();

Схема набора данных:

device
+--------+----------+----------+
|ctgry_cd|deviceInfo|deviceName|
+--------+----------+----------+
searchinfo
+------------+----------+------------------+
|targetString|deviceName|alarmDetectionCode|
+------------+----------+------------------+

1 Ответ

0 голосов
/ 18 октября 2019

Этот вопрос кажется более сложным, чем я думал. И причин две в моем случае. 1.В моем наборе данных есть пустая строка, пришедшая из csv. В этом случае я могу создать и показать этот набор данных со следующим кодом:

SparkSession ss = sparkContextManager.createThreadLocalSparkSession(functionId);
JavaSparkContext jsc = new JavaSparkContext(ss.sparkContext());
for (String fieldName : columns) {
     StructField field = DataTypes
                 .createStructField(fieldName, DataTypes.StringType, true);
     fields.add(field);
}
StructType schema = DataTypes.createStructType(fields);
List<String[]> tmpContent = LocalFileUtilsCustomize.readCsv(tempPath);
List<Row> content = jsc.parallelize(tmpContent).map(l -> RowFactory.create((Object[])l)).collect();
Dataset<Row> searchInfo= ss.createDataFrame(content,schema);
searchInfo.show();

Но когда я попытался объединить два набора данных и показать их,Я получил эту ошибку. Затем я попытался удалить пустые строки, но все равно получил ошибку. По крайней мере, я понял, что должен убедиться, что во всех строках csv одинаковое количество столбцов со схемой, даже если я установил nullable = true. Таким образом, решение этого вопроса выглядит так:

SparkSession ss = sparkContextManager.createThreadLocalSparkSession(functionId);
JavaSparkContext jsc = new JavaSparkContext(ss.sparkContext());
for (String fieldName : columns) {
    StructField field = DataTypes
                .createStructField(fieldName, DataTypes.StringType, true);
    fields.add(field);
}
StructType schema = DataTypes.createStructType(fields);
List<String[]> tmpContent = LocalFileUtilsCustomize.readCsv(tempPath);
List<Row> content = new ArrayList<>();
for(String[] s :tmpContent) {
                   Row r = null;
                   if(s[0].isEmpty() ) {
                       continue;
                   }
                   if(s.length < columns.size()) {
                       String[] tmpS = new String[columns.size()];
                       System.arraycopy(s, 0, tmpS, 0, s.length);
                       r = RowFactory.create((Object[])tmpS);
                   }else {
                       r = RowFactory.create((Object[])s);
                   }
                   content.add(r);
               }
Dataset<Row> searchInfo= ss.createDataFrame(content,schema);
searchInfo.show();
Dataset result = null;
result = deviceInfoDataset.join(searchInfo,deviceInfoDataset.col("deviceName").equalTo(searchInfo.col("deviceName")));
result.show();

...