В любом случае есть ли в javaAPI набор данных <Row>к карте () и вернуть набор данных <Row>? - PullRequest
0 голосов
/ 22 апреля 2020

Я использую spark- sql -2.4.1v с Java 8. У меня есть вариант использования, как показано ниже,

Dataset<Row> ds = //a Dataset<Row> read from DB

Мне нужно изменить, чтобы сделать некоторые манипуляции на основе записей другого набора данных, т.е.

List<String> codesList = Array.asList("code1","code2")
Dataset<Row> codes = sc.createDataSet(codesList , Encoders.bean(String.class))

Мне нужно обрабатывать весь код параллельно. Чтобы сделать то же самое, я пытаюсь, как показано ниже:

 Dataset<Row> ds_res =  codes.map( x_cod ->   //map throwing an error
        calcFunction(sparkSession, filePath, ds ,x_cod );
    }).reduce(new Function2<Dataset<Row> df1,Dataset<Row> df2) => df1.union(df2))

 ds_res .write().path(filePath).mode("append").save();

    public static Dataset<Row> calcFunction(sparkSession, filePath, ds ,x_cod ){
         //some complex calculation based on x_cod 

        return ds_res ; // return ds_res  for further processing
    }

Как заставить это работать параллельно на кластере?

1 Ответ

1 голос
/ 23 апреля 2020

Список кодирования в наборе данных является более выполнимым вариантом, чем кодирование. Если вы планируете использовать класс EJB, вы можете закодировать его для этого типа Dataset<T>

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;

import java.util.Arrays;
import java.util.List;

public class ParallelizeArray {

    public static void main(String[] args) {
        final SparkSession sparkSess = Constant.getSparkSess();
        List<String> codesList = Arrays.asList("code1", "code2");
        final Dataset<String> dataFrame = sparkSess.createDataset(codesList, Encoders.STRING());
        dataFrame.write().mode(SaveMode.Append).csv("src/main/resources/paraArray");
    }
}

или использовать

final Encoder<Dataset> bean = Encoders.bean(Dataset.class);
Dataset<Row> ds_res = codes.map((MapFunction<String, Dataset>) x_cod -> calcFunction(sparkSess, filePath, ds ,x_cod),bean)
                .reduce((ReduceFunction<Dataset>) (df1, df2) -> df1.union(df2));



    public static Dataset<Row> calcFunction(SparkSession sparkSession, String filePath, Dataset<Row> ds ,String x_cod ){
        Dataset<Row> ds_res = null;//some complex calculation based on x_cod
        return ds_res ; // return ds_res  for further processing
    }
...