Итерация по нескольким CSV и соединение с Spark SQL - PullRequest
1 голос
/ 19 сентября 2019

У меня есть несколько CSV-файлов с одинаковыми заголовками и одинаковыми идентификаторами.Я пытаюсь выполнить итерацию, чтобы объединить все файлы до одного индексированного «31».В моем цикле while я пытаюсь инициализировать объединенный набор данных, чтобы его можно было использовать для оставшейся части цикла.В последней строке мне сообщили, что «объединенная локальная переменная, возможно, не инициализирована».Как я должен вместо этого делать это?

SparkSession spark = SparkSession.builder().appName("testSql")
            .master("local[*]")
            .config("spark.sql.warehouse.dir", "file:///c:tmp")
            .getOrCreate();

Dataset<Row> first = spark.read().option("header", true).csv("mypath/01.csv");
Dataset<Row> second = spark.read().option("header", true).csv("mypath/02.csv");

IntStream.range(3, 31)
    .forEach(i -> {
        while(i==3) {
            Dataset<Row> merged = first.join(second, first.col("customer_id").equalTo(second.col("customer_id")));
            }
        Dataset<Row> next = spark.read().option("header", true).csv("mypath/"+i+".csv");
        Dataset<Row> merged  = merged.join(next, merged.col("customer_id").equalTo(next.col("customer_id")));

1 Ответ

1 голос
/ 20 сентября 2019

РЕДАКТИРОВАНИЕ на основе отзывов в комментариях.

Следуя вашему шаблону, что-то вроде этого будет работать:

Dataset<Row> ds1 = spark.read().option("header", true).csv("mypath/01.csv");
Dataset<?>[] result = {ds1};
IntStream.range(2, 31)
    .forEach(i -> {
        Dataset<Row> next = spark.read().option("header", true).csv("mypath/"+i+".csv");
        result[0] = result[0].join(next, "customer_id");
    });

Мы упаковываем Dataset вмассив, чтобы обойти ограничение на захват переменных в лямбда-выражениях.

Более простой способ для этого конкретного случая - просто использовать цикл for, а не stream.forEach:

Dataset<Row> result = spark.read().option("header", true).csv("mypath/01.csv");
for( int i = 2 ; i < 31 ; i++ ) {
  Dataset<Row> next = spark.read().option("header", true).csv("mypath/"+i+".csv");
  result[0] = result[0].join(next, "customer_id");
};
...