В Apache Spark преобразование JavaRDD <Row>в набор данных <Row>дает исключение: ArrayList не является допустимым внешним типом для схемы строки - PullRequest
0 голосов
/ 31 мая 2018

Я использую соединитель hbase-spark для извлечения данных hbase в искру JavaRDD<Row> (что, по моему мнению, я могу сделать успешно, так как я могу печатать полученные данные hbase).Затем я пытаюсь преобразовать это JavaRDD<Row> в Dataset<Row>.Но это дает мне ошибку, которая приведена далее в посте.Сначала позвольте мне начать, как выглядит мой код.

private static JavaRDD<Row> loadHBaseRDD() throws ParseException
{
    //form list of row keys
    List<byte[]> rowKeys = new ArrayList<byte[]>(5);
    //consider ids is class level variable
    ids.forEach(id -> {
        rowKeys.add(Bytes.toBytes(id));     
    });
    JavaRDD<byte[]> rdd = jsc.parallelize(rowKeys);

    //make hbase-spark connector call 
    JavaRDD resultJRDD = jhbc.bulkGet(TableName.valueOf("table1"),2,rdd,new GetFunction(),new ResultFunction());

    return resultJRDD;
}

Обратите внимание, что bulkGet() принимает экземпляры GetFunction и RsultFunction классов.У класса GetFunction есть единственный метод, который возвращает экземпляр класса Get (из клиент hbase ):

public static class GetFunction implements Function<byte[], Get> {
    private static final long serialVersionUID = 1L;
    public Get call(byte[] v) throws Exception {
        return new Get(v);
    }
}

В ResultFunction есть функция, которая преобразует экземпляр Result(класс клиента hbase) в Row:

public static class ResultFunction implements Function<Result, Row> 
{
    private static final long serialVersionUID = 1L;
    public Row call(Result result) throws Exception 
    {
        List<String> values = new ArrayList<String>(); //notice this is arraylist, we talk about this latter

        for (Cell cell : result.rawCells()) {
            values.add(Bytes.toString(CellUtil.cloneValue(cell)));
        }
        return RowFactory.create(values);
    }
}

Когда я вызываю loadHBaseRDD() и печатаю возвращаемое значение, он печатает значения правильно:

JavaRDD<Row> hbaseJavaRDD = loadHBaseRDD();
hbaseJavaRDD.foreach(row -> { 
    System.out.println(row);   //this prints rows correctly
}); 

Это означает, что строки имеютбыл правильно извлечен из Hbase для искры.Теперь я хочу преобразовать JavaRDD<Row> в Dataset<Row>, как объяснено здесь .Таким образом, я сначала создаю StructType:

StructType schema = //create schema

Затем я пытаюсь преобразовать JavaRDD в фрейм данных:

Dataset<Row> hbaseDataFrame = sparksession1.createDataFrame(hbaseJavaRDD, schema);
hbaseDataFrame.show(false);

Это вызывает исключение с очень большой стековой трассировкой (показана только часть которойниже) происходит в строке hbaseDataFrame.show(false) с первой строкой следующим образом:

java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: java.util.ArrayList is not a valid external type for schema of string

Кажется, что, поскольку values имеет тип ArrayList внутри ResultFunction.call(), он дает исключение java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: java.util.ArrayList is not a valid external type for schema of string.

В stackoveflow есть [аналогичный вопрос], в котором answer говорит, что вместо списка следует вернуть String[][].Хотя я не понимаю причины возврата String[][], я изменил ResultFunction, чтобы иметь values типа String[][]:

public static class ResultFunction implements Function<Result, Row> 
{
    private static final long serialVersionUID = 1L;
    public Row call(Result result) throws Exception 
    {
        String[] values = new String[result.rawCells().length];
        String[][] valuesWrapped = new String[1][]; 

        for(int i=0;i<result.rawCells().length;i++)
        {
            values[i] = Bytes.toString(CellUtil.cloneValue(result.rawCells()[i]));
        }
        valuesWrapped[0] = values;
        return RowFactory.create(valuesWrapped);
    }
}

Это дает ниже исключение в той же строке hbaseDataFrame.show(false):

java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: [[Ljava.lang.String; is not a valid external type for schema of string

Наконец, я снова изменил класс ResultFunction, чтобы иметь переменную values типа String[]:

public static class ResultFunction implements Function<Result, Row>
{
    private static final long serialVersionUID = 1L;
    public Row call(Result result) throws Exception 
    {
        String[] values = new String[result.rawCells().length];     
        for(int i=0;i<result.rawCells().length;i++)
        {
            values[i] = Bytes.toString(CellUtil.cloneValue(result.rawCells()[i]));
        }
        return values;
    }
}

И это дает мне исключение с трассировкой большого стека, имеющей стартовую строку:

java.lang.RuntimeException: Error while encoding: java.lang.ArrayIndexOutOfBoundsException: 14

Так что здесь может пойти не так?И как мне это сделать?

1 Ответ

0 голосов
/ 05 июня 2018

Последний подход (возвращая String[] values) был верным.Проблемы были с плохо сформированной схемой.Кажется, что я каким-то образом получил еще один столбец в схеме, чем присутствует в данных.(Благодаря дополнительному пробелу в строке схемы, содержащей столбцы, разделенные одним пробелом. Дополнительный пробел создает дополнительный столбец.)

...