Я использую соединитель hbase-spark для извлечения данных hbase в искру JavaRDD<Row>
(что, по моему мнению, я могу сделать успешно, так как я могу печатать полученные данные hbase).Затем я пытаюсь преобразовать это JavaRDD<Row>
в Dataset<Row>
.Но это дает мне ошибку, которая приведена далее в посте.Сначала позвольте мне начать, как выглядит мой код.
private static JavaRDD<Row> loadHBaseRDD() throws ParseException
{
//form list of row keys
List<byte[]> rowKeys = new ArrayList<byte[]>(5);
//consider ids is class level variable
ids.forEach(id -> {
rowKeys.add(Bytes.toBytes(id));
});
JavaRDD<byte[]> rdd = jsc.parallelize(rowKeys);
//make hbase-spark connector call
JavaRDD resultJRDD = jhbc.bulkGet(TableName.valueOf("table1"),2,rdd,new GetFunction(),new ResultFunction());
return resultJRDD;
}
Обратите внимание, что bulkGet()
принимает экземпляры GetFunction
и RsultFunction
классов.У класса GetFunction
есть единственный метод, который возвращает экземпляр класса Get
(из клиент hbase ):
public static class GetFunction implements Function<byte[], Get> {
private static final long serialVersionUID = 1L;
public Get call(byte[] v) throws Exception {
return new Get(v);
}
}
В ResultFunction
есть функция, которая преобразует экземпляр Result
(класс клиента hbase) в Row
:
public static class ResultFunction implements Function<Result, Row>
{
private static final long serialVersionUID = 1L;
public Row call(Result result) throws Exception
{
List<String> values = new ArrayList<String>(); //notice this is arraylist, we talk about this latter
for (Cell cell : result.rawCells()) {
values.add(Bytes.toString(CellUtil.cloneValue(cell)));
}
return RowFactory.create(values);
}
}
Когда я вызываю loadHBaseRDD()
и печатаю возвращаемое значение, он печатает значения правильно:
JavaRDD<Row> hbaseJavaRDD = loadHBaseRDD();
hbaseJavaRDD.foreach(row -> {
System.out.println(row); //this prints rows correctly
});
Это означает, что строки имеютбыл правильно извлечен из Hbase для искры.Теперь я хочу преобразовать JavaRDD<Row>
в Dataset<Row>
, как объяснено здесь .Таким образом, я сначала создаю StructType
:
StructType schema = //create schema
Затем я пытаюсь преобразовать JavaRDD
в фрейм данных:
Dataset<Row> hbaseDataFrame = sparksession1.createDataFrame(hbaseJavaRDD, schema);
hbaseDataFrame.show(false);
Это вызывает исключение с очень большой стековой трассировкой (показана только часть которойниже) происходит в строке hbaseDataFrame.show(false)
с первой строкой следующим образом:
java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: java.util.ArrayList is not a valid external type for schema of string
Кажется, что, поскольку values
имеет тип ArrayList
внутри ResultFunction.call()
, он дает исключение java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: java.util.ArrayList is not a valid external type for schema of string
.
В stackoveflow есть [аналогичный вопрос], в котором answer говорит, что вместо списка следует вернуть String[][]
.Хотя я не понимаю причины возврата String[][]
, я изменил ResultFunction
, чтобы иметь values
типа String[][]
:
public static class ResultFunction implements Function<Result, Row>
{
private static final long serialVersionUID = 1L;
public Row call(Result result) throws Exception
{
String[] values = new String[result.rawCells().length];
String[][] valuesWrapped = new String[1][];
for(int i=0;i<result.rawCells().length;i++)
{
values[i] = Bytes.toString(CellUtil.cloneValue(result.rawCells()[i]));
}
valuesWrapped[0] = values;
return RowFactory.create(valuesWrapped);
}
}
Это дает ниже исключение в той же строке hbaseDataFrame.show(false)
:
java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: [[Ljava.lang.String; is not a valid external type for schema of string
Наконец, я снова изменил класс ResultFunction
, чтобы иметь переменную values
типа String[]
:
public static class ResultFunction implements Function<Result, Row>
{
private static final long serialVersionUID = 1L;
public Row call(Result result) throws Exception
{
String[] values = new String[result.rawCells().length];
for(int i=0;i<result.rawCells().length;i++)
{
values[i] = Bytes.toString(CellUtil.cloneValue(result.rawCells()[i]));
}
return values;
}
}
И это дает мне исключение с трассировкой большого стека, имеющей стартовую строку:
java.lang.RuntimeException: Error while encoding: java.lang.ArrayIndexOutOfBoundsException: 14
Так что здесь может пойти не так?И как мне это сделать?