как метод buildscan вызывается внутри в собственном источнике данных spark - PullRequest
0 голосов
/ 10 июня 2019

Я пишу пользовательский источник данных, который преобразует ответ, отправленный моим вызовом rest, т.е. JSON, в набор данных.Я использую jsonPath для извлечения обязательных полей из json и создания схемы, а также данных.Я использую buildscan черты сканирования таблицы для преобразования данных в rdd.Но почему-то мой метод buildscan вызывается несколько раз, и в конце я получаю пустой набор данных.Может кто-нибудь помочь с тем, почему buildscan вызывается несколько раз и как его предотвратить.

В локальной системе - Mac работает нормально, но при работе в кластерном env, несколько раз вызывает buildscan, который переопределяет набор данных истановится пустым

public StructType schema() {
       if(schema == null){
           schema =createSchema();
       }
        return schema;
    }

    @Override
    public RDD<Row> buildScan() {
        List<List<Object>>  table = createRowSet();
        @SuppressWarnings("resource")
        JavaSparkContext sparkContext = new JavaSparkContext(sqlContext.sparkContext());
        JavaRDD<Row> rowRDD = sparkContext.parallelize(table)
                .map(row -> RowFactory.create(row.toArray()));

        return rowRDD.rdd();
    }

    private StructType createSchema() {
        List<StructField> fields = new ArrayList<>();
        if (jsonPathData.size() == 0) {
            return null;
        } else {
            for (int i = 0; i < jsonPathData.size(); i++) {
                fields.add(DataTypes.createStructField(jsonPathData.get(i).replace('.','_'), DataTypes.StringType, true));
            }
        }
        return DataTypes.createStructType(fields);
    }

    /**
     * Builds the data table that will then be turned into a RDD.
     */
    private List<List<Object>> createRowSet() {
        List<Map<String, Object>> records = jsonContext.read(jsonPathArray);

        List<List<Object>> data = new ArrayList<>();
        if(records == null){
            return data;
        }
        logger.logDebugMessage("Records "+records.size());

        for (int i = 0; i < records.size(); i++) {
            List<Object> rowData = new ArrayList<>();
            Map<String, Object> map1 = records.get(i);
            jsonContext = JsonPath.parse(map1,
                    Configuration.defaultConfiguration().addOptions(Option.SUPPRESS_EXCEPTIONS));
            for (int j = 0; j < jsonPathData.size(); j++) {
                rowData.add(jsonContext.read("$." + jsonPathData.get(j)));
            }
            data.add(i,rowData);
        }
        return data;
    }

Я ожидаю, что набор данных покажет значения таблицы.В локальной системе он показывает набор данных, но при работе в кластерном окружении он вызывает buildscan несколько раз

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...