Я пишу пользовательский источник данных, который преобразует ответ, отправленный моим вызовом rest, т.е. JSON, в набор данных.Я использую jsonPath для извлечения обязательных полей из json и создания схемы, а также данных.Я использую buildscan черты сканирования таблицы для преобразования данных в rdd.Но почему-то мой метод buildscan вызывается несколько раз, и в конце я получаю пустой набор данных.Может кто-нибудь помочь с тем, почему buildscan вызывается несколько раз и как его предотвратить.
В локальной системе - Mac работает нормально, но при работе в кластерном env, несколько раз вызывает buildscan, который переопределяет набор данных истановится пустым
public StructType schema() {
if(schema == null){
schema =createSchema();
}
return schema;
}
@Override
public RDD<Row> buildScan() {
List<List<Object>> table = createRowSet();
@SuppressWarnings("resource")
JavaSparkContext sparkContext = new JavaSparkContext(sqlContext.sparkContext());
JavaRDD<Row> rowRDD = sparkContext.parallelize(table)
.map(row -> RowFactory.create(row.toArray()));
return rowRDD.rdd();
}
private StructType createSchema() {
List<StructField> fields = new ArrayList<>();
if (jsonPathData.size() == 0) {
return null;
} else {
for (int i = 0; i < jsonPathData.size(); i++) {
fields.add(DataTypes.createStructField(jsonPathData.get(i).replace('.','_'), DataTypes.StringType, true));
}
}
return DataTypes.createStructType(fields);
}
/**
* Builds the data table that will then be turned into a RDD.
*/
private List<List<Object>> createRowSet() {
List<Map<String, Object>> records = jsonContext.read(jsonPathArray);
List<List<Object>> data = new ArrayList<>();
if(records == null){
return data;
}
logger.logDebugMessage("Records "+records.size());
for (int i = 0; i < records.size(); i++) {
List<Object> rowData = new ArrayList<>();
Map<String, Object> map1 = records.get(i);
jsonContext = JsonPath.parse(map1,
Configuration.defaultConfiguration().addOptions(Option.SUPPRESS_EXCEPTIONS));
for (int j = 0; j < jsonPathData.size(); j++) {
rowData.add(jsonContext.read("$." + jsonPathData.get(j)));
}
data.add(i,rowData);
}
return data;
}
Я ожидаю, что набор данных покажет значения таблицы.В локальной системе он показывает набор данных, но при работе в кластерном окружении он вызывает buildscan несколько раз