Повторяющаяся строка кадра данных Spark на основе значения столбца разделения в scala - PullRequest
0 голосов
/ 14 января 2020

У меня есть следующий код в scala:

val  fullCertificateSourceDf = certificateSourceDf         
              .withColumn("Stage", when(col("Data.WorkBreakdownUp1Summary").isNotNull && col("Data.WorkBreakdownUp1Summary")=!="",                                                     rtrim(regexp_extract($"Data.WorkBreakdownUp1Summary","^.*?(?= - *[a-zA-Z])",0))).otherwise(""))
              .withColumn("SubSystem", when(col("Data.ProcessBreakdownSummaryList").isNotNull && col("Data.ProcessBreakdownSummaryList")=!="",                                         regexp_extract($"Data.ProcessBreakdownSummaryList","^.*?(?= - *[a-zA-Z])",0)).otherwise(""))
              .withColumn("System", when(col("Data.ProcessBreakdownUp1SummaryList").isNotNull && col("Data.ProcessBreakdownUp1SummaryList")=!="",                                         regexp_extract($"Data.ProcessBreakdownUp1SummaryList","^.*?(?= - *[a-zA-Z])",0)).otherwise(""))
              .withColumn("Facility", when(col("Data.ProcessBreakdownUp2Summary").isNotNull && col("Data.ProcessBreakdownUp2Summary")=!="",                                         regexp_extract($"Data.ProcessBreakdownUp2Summary","^.*?(?= - *[a-zA-Z])",0)).otherwise(""))
              .withColumn("Area", when(col("Data.ProcessBreakdownUp3Summary").isNotNull && col("Data.ProcessBreakdownUp3Summary")=!="",                                         regexp_extract($"Data.ProcessBreakdownUp3Summary","^.*?(?= - *[a-zA-Z])",0)).otherwise(""))
              .select("Data.ID",
                      "Data.CertificateID",
                      "Data.CertificateTag",
                      "Data.CertificateDescription",
                      "Data.WorkBreakdownUp1Summary",
                      "Data.ProcessBreakdownSummaryList",
                      "Data.ProcessBreakdownUp1SummaryList",
                      "Data.ProcessBreakdownUp2Summary",
                      "Data.ProcessBreakdownUp3Summary",
                      "Data.ActualStartDate",
                      "Data.ActualEndDate",
                      "Data.ApprovedDate",
                      "Data.CurrentState",
                      "DataType",
                      "PullDate",
                      "PullTime",
                      "Stage",
                      "System",
                      "SubSystem",
                      "Facility",
                      "Area"
                     )
                     .filter((col("Stage").isNotNull) && (length(col("Stage"))>0))
                     .filter(((col("SubSystem").isNotNull) && (length(col("SubSystem"))>0)) || ((col("System").isNotNull) && (length(col("System"))>0)) || ((col("Facility").isNotNull) && (length(col("Facility"))>0)) || ((col("Area").isNotNull) && (length(col("Area"))>0))
                      )
                     .select("*")

Этот фрейм данных fullCertificateSourceDf содержит следующие данные:

Original Data

Я скрыл некоторые столбцы для краткости.

Я хочу, чтобы данные выглядели так:

Target Data

Мы разбиваемся на два столбцы: ProcessBreakdownSummaryList и ProcessBreakdownUp1SummaryList. Они оба являются списками, разделенными запятыми.

Обратите внимание, если значения указаны в ProcessBreakdownSummaryList (CS10-100-22-10 - Система подогрева вентилятора приточного воздуха, CS10-100-81 -10 - Mine Service Switchgear) и ProcessBreakdownUp1SummaryList (CS10-100-22 - вентиляция сервисного вала, CS10-100-81 - сервисный вал электрооборудования) - это то же самое, что мы должны разделить только один раз.

Однако, если они отличаются, как в ProcessBreakdownSummaryList (CS10-100-22-10 - Система подогрева вентилятора приточного воздуха, CS10-100-81 -10 - Распределительное устройство Mine Services) и ProcessBreakdownUp1SummaryList (CS10-100-22 - вентиляция сервисного вала, CS10-100-34 - электрооборудование сервисного вала) он должен снова разделиться на третий ряд.

Заранее благодарим за помощь с этим.

1 Ответ

0 голосов
/ 15 января 2020

Вы можете решить это многими способами, я думаю, что самый простой подход для сложной обработки - использовать scala. Вы можете прочитать все столбцы, включая «ProcessBreakdownSummaryList» и «ProcessBreakdownUp1SummaryList», сравнить их значения на предмет одинаковости / отличия и выдать несколько строк для одной строки ввода. Затем отобразите на выходе плоский файл, чтобы получить кадр данных со всеми необходимыми строками.

val fullCertificateSourceDf = // your code

fullCertificateSourceDf.map{ row =>
val id = row.getAs[String]("Data.ID")
... read all columns

val processBreakdownSummaryList = row.getAs[String]("Data.ProcessBreakdownSummaryList")
val processBreakdownUp1SummaryList = row.getAs[String]("Data.ProcessBreakdownUp1SummaryList")

//split processBreakdownSummaryList on ","
//split processBreakdownUp1SummaryList on ","
//compare then for equality 
//lets say you end up with 4 rows.

//return Seq of those 4 rows in a list processBreakdownSummary
//return a List of tuple of strings like List((id, certificateId, certificateTag, ..distinct values of processBreakdownUp1SummaryList...), (...) ...)
//all columns id, certificateId, certificateTag etc are repeated for each distinct value of processBreakdownUp1SummaryList and processBreakdownSummaryList

}.flatMap(identity(_)).toDF("column1","column2"...)

Вот пример разделения одной строки на несколько

    val employees = spark.createDataFrame(Seq(("E1",100.0,"a,b"), ("E2",200.0,"e,f"),("E3",300.0,"c,d"))).toDF("employee","salary","clubs")

    employees.map{ r =>
      val clubs = r.getAs[String]("clubs").split(",")
      for{
        c : String <- clubs
      }yield(r.getAs[String]("employee"),r.getAs[Double]("salary"), c)
    }.flatMap(identity(_)).toDF("employee","salary","clubs").show(false)

Результат выглядит как

+--------+------+-----+
|employee|salary|clubs|
+--------+------+-----+
|E1      |100.0 |a    |
|E1      |100.0 |b    |
|E2      |200.0 |e    |
|E2      |200.0 |f    |
|E3      |300.0 |c    |
|E3      |300.0 |d    |
+--------+------+-----+
...