Чтение файла CSV и цикл на основе фильтра - PullRequest
0 голосов
/ 30 сентября 2019

У меня есть файл csv в хранилище BLOB-объектов Azure, в котором содержатся подробности, приведенные ниже.

Исходя из значения ИСТИНА / ЛОЖЬ, я должен взять год / месяц и передать его в качестве параметра, чтобы найти исходную папку для операции копирования как часть. строки "Папка \ Год \ Месяц * .csv".

Передайте эти параметры в виде исходной строки в цикле, чтобы собрать файлы, присутствующие в папке, и вставить их в папку назначения.

Я ХОЧУ ПОЛУЧИТЬ значения в цикле, чтобы получить исходную строку и передать как переменную. Мне не нужно обновлять CSV с новым столбцом «Foldercolumn» или создавать новый фрейм данных на основе всех записей.

+-------------+--------------+--------------------+-----------------+--------+
|Calendar_year|Calendar_month|EDAP_Data_Load_Statu|lake_refined_date|isreload|
+-------------+--------------+--------------------+-----------------+--------+
|         2018|            12|                HIST|         20190829|   FALSE|
|         2019|             1|                HIST|         20190829|   FALSE|
|         2019|             2|                HIST|         20190829|   FALSE|
|         2019|             3|                HIST|         20190829|    TRUE|
|         2019|             4|                HIST|         20190829|   FALSE|
|         2019|             5|                HIST|         20190829|    TRUE|
|         2019|            11|                HIST|         20190829|   FALSE|
+-------------+--------------+--------------------+-----------------+--------+

Ниже приведен мой искровой код для вышеуказанного требования

val destinationContainerPath= "Finance/Data"
val dfCSVLogs = readCSV(s"$destinationContainerPath/sourcecsv.csv")

val dfTRUEcsv = dfCSVLogs.select(dfCSVLogs.col("*")).filter("isreload =='TRUE'")

получить согласованную строку для каждого столбца

IF isreload =='TRUE' 
                    strFoldercolumn Calendar_month 
                     strFoldercolumn =     2019/03
                   strFoldercolumn =     2019/05

      end if
this is by default get the max value and get the parameter of max value

             var Foldercolumn  max(Calendar_year ),max(Calendar_month )
                       strFoldercolumn =     2019/11

я должен выполнить цикл для каждогоstrFoldercolumn и собирать данные из файла и вставлять их в другое место назначения в BLOB-хранилище

1 Ответ

0 голосов
/ 30 сентября 2019
    //read input control CSV file 
    scala> val df = spark.read.format("csv").option("header", "true").load("file.csv")
    scala> df.show(false)
    +-------------+--------------+--------------------+-----------------+--------+
    |Calendar_year|Calendar_month|EDAP_Data_Load_Statu|lake_refined_date|isreload|
    +-------------+--------------+--------------------+-----------------+--------+
    |2018         |12            |HIST                |20190829         |FALSE   |
    |2019         |2             |HIST                |20190829         |FALSE   |
    |2019         |3             |HIST                |20190829         |TRUE    |
    |2019         |4             |HIST                |20190829         |FALSE   |
    |2019         |11            |HIST                |20190829         |FALSE   |
    |2019         |5             |HIST                |20190829         |TRUE    |
    +-------------+--------------+--------------------+-----------------+--------+
    //initialize variable for max year and month 
    //note: below execution cam be modified on the basis of your requirement simply use filter to get max of particular condition

    scala> val maxYearMonth =  df.select(struct(col("Calendar_year").cast("Int"), col("Calendar_month").cast("Int")) as "ym").agg(max("ym") as "max").selectExpr("stack(1,max.col1,max.col2) as (year, month)").select( concat(col("year"), lit("/") ,col("month"))).rdd.collect.map( r => r(0)).mkString
           res56: maxYearMonth = 2019/11

    //Adding column temparary in input DataFrame
    scala> val df2 = df.withColumn("strFoldercolumn", when(col("isreload") === "TRUE", concat(col("Calendar_year"), lit("/"),col("Calendar_month"))).otherwise(lit(maxYearMonth)))
    scala> df2.show(false)
    +-------------+--------------+--------------------+-----------------+--------+-----------+
    |Calendar_year|Calendar_month|EDAP_Data_Load_Statu|lake_refined_date|isreload|strFoldercolumn|
    +-------------+--------------+--------------------+-----------------+--------+-----------+
    |2018         |12            |HIST                |20190829         |FALSE   |2019/11    |
    |2019         |2             |HIST                |20190829         |FALSE   |2019/11    |
    |2019         |3             |HIST                |20190829         |TRUE    |2019/3     |
    |2019         |4             |HIST                |20190829         |FALSE   |2019/11    |
    |2019         |11            |HIST                |20190829         |FALSE   |2019/11    |
    |2019         |5             |HIST                |20190829         |TRUE    |2019/5     |
    +-------------+--------------+--------------------+-----------------+--------+-----------+


    //move value of column strFoldercolumn into strFoldercolumn list variable 
    scala> val strFoldercolumn = df2.select("strFoldercolumn").distinct.rdd.collect.toList
    strFoldercolumn: List[org.apache.spark.sql.Row] = List([2019/5], [2019/11], [2019/3])

    //lopping each value
    scala>strFoldercolumn.foreach { x =>
         | val csvPath =  "folder/" + x.toString + "/*.csv"
         | val srcdf = spark.read.format("csv").option("header", "true").load(csvPath)
         | // Write logic to copy or write srcdf to your destination folder
         | 
         | }
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...