Как загрузить и обработать несколько CSV-файлов из каталога DBFS с помощью Spark - PullRequest
0 голосов
/ 01 марта 2019

Я хочу запустить следующий код для каждого файла, который я читаю из DBFS (Databricks FileSystem).Я проверил это на всех файлах, которые находятся в папке, но я хочу сделать аналогичные вычисления для каждого файла в папке, один за другим:

// a-e are calculated fields
val df2=Seq(("total",a,b,c,d,e)).toDF("file","total","count1","count2","count3","count4")

//schema is now an empty dataframe
val final1 = schema.union(df2)

Возможно ли это?Я полагаю, что чтение из dbfs должно быть сделано иначе, чем то, что я делаю сейчас:

val df1 = spark
      .read
      .format("csv")
      .option("header", "true")
      .option("delimiter",",")
      .option("inferSchema", "true")
      .load("dbfs:/Reports/*.csv")
      .select("lot of ids")

Заранее большое спасибо за идеи :)

1 Ответ

0 голосов
/ 03 марта 2019

Как уже говорилось, у вас есть 3 варианта здесь.

В моем примере я использую следующие 3 набора данных:

+----+----+----+
|col1|col2|col3|
+----+----+----+
|1   |100 |200 |
|2   |300 |400 |
+----+----+----+

+----+----+----+
|col1|col2|col3|
+----+----+----+
|3   |60  |80  |
|4   |12  |100 |
|5   |20  |10  |
+----+----+----+

+----+----+----+
|col1|col2|col3|
+----+----+----+
|7   |20  |40  |
|8   |30  |40  |
+----+----+----+

Вы сначала создаете свою схему (быстрее определить схему, а не выводить ее):

import org.apache.spark.sql.types._

val df_schema =
  StructType(
    List(
        StructField("col1", IntegerType, true),
        StructField("col2", IntegerType, true),
        StructField("col3", IntegerType, true)))

Опция 1:

Загрузка всех CSV одновременно с помощью:

val df1 = spark
      .read
      .option("header", "false")
      .option("delimiter", ",")
      .option("inferSchema", "false")
      .schema(df_schema)
      .csv("file:///C:/data/*.csv")

Затем примените свою логику ко всей группе наборов данных по имени файла.

Условие : Вы должны найти способ добавить имя файла к каждому файлу

Опция 2:

Загрузка CSV-файлов из каталога.Затем переберите файлы и создайте информационный кадр для каждого CSV.Внутри цикла примените свою логику к каждому CSV.Наконец, в конце цикла добавьте (объедините) результаты во второй кадр данных, в котором будут храниться ваши накопленные результаты.

Внимание: Имейте в виду, что большое количество файлов может привести к очень большой DAG и, как следствие, к огромному плану выполнения, во избежание этого вы можете сохранить текущие результаты или сбор вызовов,В приведенном ниже примере я предполагал, что persist или collect будет выполняться для каждой итерации bufferSize.Вы можете настроить или даже удалить эту логику в соответствии с количеством CSV-файлов.

Это пример кода для 2nd option :

import java.io.File
import org.apache.spark.sql.Row
import spark.implicits._

val dir = "C:\\data_csv\\"
val csvFiles = new File(dir).listFiles.filter(_.getName.endsWith(".csv"))

val bufferSize = 10
var indx = 0
//create an empty df which will hold the accumulated results
var bigDf = spark.createDataFrame(spark.sparkContext.emptyRDD[Row], df_schema)
csvFiles.foreach{ path => 
    var tmp_df = spark
                  .read
                  .option("header", "false")
                  .option("delimiter", ",")
                  .option("inferSchema", "false")
                  .schema(df_schema)
                  .csv(path.getPath)

    //execute your custom logic/calculations with tmp_df

    if((indx + 1) % bufferSize == 0){
        // If buffer size reached then
        // 1. call unionDf.persist() or unionDf.collect()
        // 2. in the case you use collect() load results into unionDf again 
    }

    bigDf = bigDf.union(tmp_df)
    indx = indx + 1
}
bigDf.show(false)

Это должно привести к выводу:

+----+----+----+
|col1|col2|col3|
+----+----+----+
|1   |100 |200 |
|2   |300 |400 |
|3   |60  |80  |
|4   |12  |100 |
|5   |20  |10  |
|7   |20  |40  |
|8   |30  |40  |
+----+----+----+

Вариант 3:

Последний вариант - использовать встроенную функцию spark.sparkContext.wholeTextFiles.

Это код для загрузки всех файлов CSV в RDD:

val data = spark.sparkContext.wholeTextFiles("file:///C:/data_csv/*.csv")
val df = spark.createDataFrame(data)

df.show(false)

И вывод:

+--------------------------+--------------------------+
|_1                        |_2                        |
+--------------------------+--------------------------+
|file:/C:/data_csv/csv1.csv|1,100,200                 |
|                          |2,300,400                 |
|file:/C:/data_csv/csv2.csv|3,60,80                   |
|                          |4,12,100                  |
|                          |5,20,10                   |
|file:/C:/data_csv/csv3.csv|7,20,40                   |
|                          |8,30,40                   |
+--------------------------+--------------------------+

spark.sparkContext.wholeTextFiles вернет ключ / значениеСДР, в которой ключом является путь к файлу, а значением - данные файла.

Требуется дополнительный код для извлечения содержимого _2, которое является содержимым каждого CSV.По моему мнению, это может быть связано с дополнительными затратами на производительность и ремонтопригодность программы, поэтому я бы ее избежал.

Дайте мне знать, если вам нужны дополнительные разъяснения

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...