Как получить доступ к переменной Java Spark Broadcast? - PullRequest
0 голосов
/ 28 августа 2018

Я пытаюсь передать искру Dataset, чтобы получить доступ к ней из функции map. Первый оператор print возвращает первую строку переданного набора данных, как и ожидалось. К сожалению, второй оператор print не возвращает результат. Казнь просто зависает в этой точке. Есть идеи, что я делаю не так?

    Broadcast<JavaRDD<Row>> broadcastedTrainingData = this.javaSparkContext.broadcast(trainingData.toJavaRDD());

    System.out.println("Data:" + broadcastedTrainingData.value().first());
    JavaRDD<Row> rowRDD = this.javaSparkContext.parallelize(stringAsList).map((Integer row) -> {
        System.out.println("Data (map):" + broadcastedTrainingData.value().first());
        return RowFactory.create(row);
    });

Следующий псевдокод выделяет то, чего я хочу достичь. Моя главная цель - передать набор обучающих данных, чтобы я мог использовать его в функции карты.

    public Dataset<Row> getWSSE(Dataset<Row> trainingData, int clusterRange) {
        StructType structType = new StructType();
        structType = structType.add("ClusterAm", DataTypes.IntegerType, false);
        structType = structType.add("Cost", DataTypes.DoubleType, false);

        List<Integer> stringAsList = new ArrayList<>();
        for (int clusterAm = 2; clusterAm < clusterRange + 2; clusterAm++) {
            stringAsList.add(clusterAm);
        }

        Broadcast<Dataset> broadcastedTrainingData = this.javaSparkContext.broadcast(trainingData);

        System.out.println("Data:" + broadcastedTrainingData.value().first());
        JavaRDD<Row> rowRDD = this.javaSparkContext.parallelize(stringAsList).map((Integer row) -> RowFactory.create(row));

        StructType schema = DataTypes.createStructType(new StructField[]{DataTypes.createStructField("ClusterAm", DataTypes.IntegerType, false)});

        Dataset wsse = sqlContext.createDataFrame(rowRDD, schema).toDF();
        wsse.show();

        ExpressionEncoder<Row> encoder = RowEncoder.apply(structType);

        Dataset result = wsse.map(
                (MapFunction<Row, Row>) row -> RowFactory.create(row.getAs("ClusterAm"), new KMeans().setK(row.getAs("ClusterAm")).setSeed(1L).fit(broadcastedTrainingData.value()).computeCost(broadcastedTrainingData.value())),
                encoder);

        result.show();
        broadcastedTrainingData.destroy();
        return wsse;
    }
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...