Как добавить несколько столбцов со значениями из вложенного массива в azure-databrick - PullRequest
0 голосов
/ 10 мая 2019

Я пытаюсь преобразовать данные аналитического клика Google в определенный формат на основе данных каждого сеанса.Возникла проблема с разнесением / удалением данных из массива в массиве.

Я создал статический фрейм данных, чтобы протестировать и попробовать

    case class CustomDimensions(index: String, value: String)
    case class Hits(hitNumber: String, hour: String, minute: String, customDimensions: Seq[CustomDimensions])
    case class Session(date: String, fullVisitorId: String, visitNumber: String, hits: Seq[Hits])

    val cd10 = new CustomDimensions("16", "AE456")
    val cd11= new CustomDimensions("55", "12345")
    val cd12= new CustomDimensions("58", "West")
    val cd13 = new CustomDimensions("16", "AE456")
    val cd14= new CustomDimensions("55", "12345")
    val cd15= new CustomDimensions("58", "West")

    val cd20 = new CustomDimensions("16", "HJ956")
    val cd21 = new CustomDimensions("55", "56432")
    val cd22 = new CustomDimensions("58", "North")
    val cd23 = new CustomDimensions("16", "HJ956")
    val cd24 = new CustomDimensions("55", "56432")
    val cd25 = new CustomDimensions("58", "North")

    val cd30 = new CustomDimensions("16", "QW345")
    val cd31 = new CustomDimensions("55", "87665")
    val cd32 = new CustomDimensions("58", "West")
    val cd33 = new CustomDimensions("16", "QW345")
    val cd34 = new CustomDimensions("55", "87665")
    val cd35 = new CustomDimensions("58", "West")

    val cd40 = new CustomDimensions("16", "JK540")
    val cd41 = new CustomDimensions("55", "90223")
    val cd42 = new CustomDimensions("58", "South")
    val cd43 = new CustomDimensions("16", "JK540")
    val cd44 = new CustomDimensions("55", "90223")
    val cd45 = new CustomDimensions("58", "South")

    val hits10 = new Hits("1", "8", "13", Seq(cd10, cd11, cd12))
    val hits11 = new Hits("2", "8", "19", Seq(cd13, cd14, cd15))

    val hits20 = new Hits("1", "9", "23", Seq(cd20, cd21, cd22))
    val hits21 = new Hits("2", "9", "29", Seq(cd23, cd24, cd25))

    val hits30 = new Hits("1", "10", "33", Seq(cd30, cd31, cd32))
    val hits31 = new Hits("2", "10", "39", Seq(cd33, cd34, cd35))

    val hits40 = new Hits("1", "11", "43", Seq(cd40, cd41, cd42))
    val hits41 = new Hits("2", "11", "49", Seq(cd43, cd44, cd45))

    val session1 = new Session("2019-05-02", "12345", "1", Seq(hits10, hits11))
    val session2 = new Session("2019-05-02", "12346", "1", Seq(hits20, hits21))
    val session3 = new Session("2019-05-02", "12347", "1", Seq(hits30, hits31))
    val session4 = new Session("2019-05-02", "12348", "1", Seq(hits40, hits41))

Это вложено во вложенный файл:

    session 1
     - hits 1
       - customdimension a
       - customdimension b
     - hits 2
       - customdimension c
       - customdimension d
    session 2
      - hits 3
       - customdimension e
       - customdimension f
    ... etc

Схему сеанса GA можно найти здесьhttps://support.google.com/analytics/answer/3437719?hl=en

и из этого мне нужно получить результат

fullVisitorId, visitNumber, hits.hitNumber, hits.hour, hits.minute, и для каждой строки и уникального значения получите ОДИН специфичный CustomDimension (т.е. index = 16) в качестве нового столбца.Как fullVisitorId, visitNumber, hits.hitNumber, hits.hour, hits.minute, customDimension_16, customDimension_58 и т. Д.

Это мое начало

    val flattenDF = df
      .select($"date", $"fullVisitorId", $"visitNumber", 
    functions.explode($"hits") as "hits")
      .select($"date", $"fullVisitorId", $"visitNumber", $"hits.hitNumber" as "hitNumber", $"hits.hour" as "hitHour", $"hits.minute" as "hitMinute")

    flattenDF.printSchema()
    flattenDF.show()

первый .select помогает мне explode первый массив hits и я не хочу взрыватьсямассив (CustomDimension) в пределах hits здесь, потому что он создает строку для каждого customDimension.Мне нужно для каждого попадания (uniqe by hitNumber) получить определенный customDimension (т.е. 16, 58 и т. Д.) И добавить их в виде столбца после той же записи.

Попытка с .withColumn, но не полностью понимаю, как в параметре функции .withColumn ссылаться на столбцы в массиве customDimensions, не разбирая его?!

Я нашелпример из документации по документам о выравнивании вложенных столбцов,

    import org.apache.spark.sql._
    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.types._

    implicit class DataFrameFlattener(df: DataFrame) {
      def flattenSchema: DataFrame = {
       df.select(flatten(Nil, df.schema): _*)
     }

    protected def flatten(path: Seq[String], schema: DataType): Seq[Column] = schema match {
    case s: StructType => s.fields.flatMap(f => flatten(path :+ f.name, f.dataType))
    case other => col(path.map(n => s"`$n`").mkString(".")).as(path.mkString(".")) :: Nil
        }
    }

Я думаю, что это мне помогает, но, похоже, это не помогает мне с частью customDimension в качестве отдельного столбца для каждого индекса.

В T-SQL это будет выглядеть примерно так

    SELECT 
      fullVisitorId,
      visitNumber,
      (SELECT max(case when customdimension.index = 16 then customdimension.value else '' end) as cd_16,
      (SELECT max(case when customdimension.index = NN then customdimension.value else '' end) as cd_NN
    FROM ga_sessions

это не работает T-sql, но чтобы продемонстрировать результат, который я хотел бы получить с блоком данных databricks.

Любая помощь в настройке моего в правильном направлении, и любые ссылки были бы великолепны!

...