Я пытаюсь преобразовать данные аналитического клика Google в определенный формат на основе данных каждого сеанса.Возникла проблема с разнесением / удалением данных из массива в массиве.
Я создал статический фрейм данных, чтобы протестировать и попробовать
case class CustomDimensions(index: String, value: String)
case class Hits(hitNumber: String, hour: String, minute: String, customDimensions: Seq[CustomDimensions])
case class Session(date: String, fullVisitorId: String, visitNumber: String, hits: Seq[Hits])
val cd10 = new CustomDimensions("16", "AE456")
val cd11= new CustomDimensions("55", "12345")
val cd12= new CustomDimensions("58", "West")
val cd13 = new CustomDimensions("16", "AE456")
val cd14= new CustomDimensions("55", "12345")
val cd15= new CustomDimensions("58", "West")
val cd20 = new CustomDimensions("16", "HJ956")
val cd21 = new CustomDimensions("55", "56432")
val cd22 = new CustomDimensions("58", "North")
val cd23 = new CustomDimensions("16", "HJ956")
val cd24 = new CustomDimensions("55", "56432")
val cd25 = new CustomDimensions("58", "North")
val cd30 = new CustomDimensions("16", "QW345")
val cd31 = new CustomDimensions("55", "87665")
val cd32 = new CustomDimensions("58", "West")
val cd33 = new CustomDimensions("16", "QW345")
val cd34 = new CustomDimensions("55", "87665")
val cd35 = new CustomDimensions("58", "West")
val cd40 = new CustomDimensions("16", "JK540")
val cd41 = new CustomDimensions("55", "90223")
val cd42 = new CustomDimensions("58", "South")
val cd43 = new CustomDimensions("16", "JK540")
val cd44 = new CustomDimensions("55", "90223")
val cd45 = new CustomDimensions("58", "South")
val hits10 = new Hits("1", "8", "13", Seq(cd10, cd11, cd12))
val hits11 = new Hits("2", "8", "19", Seq(cd13, cd14, cd15))
val hits20 = new Hits("1", "9", "23", Seq(cd20, cd21, cd22))
val hits21 = new Hits("2", "9", "29", Seq(cd23, cd24, cd25))
val hits30 = new Hits("1", "10", "33", Seq(cd30, cd31, cd32))
val hits31 = new Hits("2", "10", "39", Seq(cd33, cd34, cd35))
val hits40 = new Hits("1", "11", "43", Seq(cd40, cd41, cd42))
val hits41 = new Hits("2", "11", "49", Seq(cd43, cd44, cd45))
val session1 = new Session("2019-05-02", "12345", "1", Seq(hits10, hits11))
val session2 = new Session("2019-05-02", "12346", "1", Seq(hits20, hits21))
val session3 = new Session("2019-05-02", "12347", "1", Seq(hits30, hits31))
val session4 = new Session("2019-05-02", "12348", "1", Seq(hits40, hits41))
Это вложено во вложенный файл:
session 1
- hits 1
- customdimension a
- customdimension b
- hits 2
- customdimension c
- customdimension d
session 2
- hits 3
- customdimension e
- customdimension f
... etc
Схему сеанса GA можно найти здесьhttps://support.google.com/analytics/answer/3437719?hl=en
и из этого мне нужно получить результат
fullVisitorId, visitNumber, hits.hitNumber, hits.hour, hits.minute
, и для каждой строки и уникального значения получите ОДИН специфичный CustomDimension (т.е. index = 16) в качестве нового столбца.Как fullVisitorId, visitNumber, hits.hitNumber, hits.hour, hits.minute, customDimension_16, customDimension_58
и т. Д.
Это мое начало
val flattenDF = df
.select($"date", $"fullVisitorId", $"visitNumber",
functions.explode($"hits") as "hits")
.select($"date", $"fullVisitorId", $"visitNumber", $"hits.hitNumber" as "hitNumber", $"hits.hour" as "hitHour", $"hits.minute" as "hitMinute")
flattenDF.printSchema()
flattenDF.show()
первый .select
помогает мне explode
первый массив hits
и я не хочу взрыватьсямассив (CustomDimension) в пределах hits
здесь, потому что он создает строку для каждого customDimension.Мне нужно для каждого попадания (uniqe by hitNumber) получить определенный customDimension (т.е. 16, 58 и т. Д.) И добавить их в виде столбца после той же записи.
Попытка с .withColumn
, но не полностью понимаю, как в параметре функции .withColumn
ссылаться на столбцы в массиве customDimensions, не разбирая его?!
Я нашелпример из документации по документам о выравнивании вложенных столбцов,
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
implicit class DataFrameFlattener(df: DataFrame) {
def flattenSchema: DataFrame = {
df.select(flatten(Nil, df.schema): _*)
}
protected def flatten(path: Seq[String], schema: DataType): Seq[Column] = schema match {
case s: StructType => s.fields.flatMap(f => flatten(path :+ f.name, f.dataType))
case other => col(path.map(n => s"`$n`").mkString(".")).as(path.mkString(".")) :: Nil
}
}
Я думаю, что это мне помогает, но, похоже, это не помогает мне с частью customDimension в качестве отдельного столбца для каждого индекса.
В T-SQL это будет выглядеть примерно так
SELECT
fullVisitorId,
visitNumber,
(SELECT max(case when customdimension.index = 16 then customdimension.value else '' end) as cd_16,
(SELECT max(case when customdimension.index = NN then customdimension.value else '' end) as cd_NN
FROM ga_sessions
это не работает T-sql, но чтобы продемонстрировать результат, который я хотел бы получить с блоком данных databricks.
Любая помощь в настройке моего в правильном направлении, и любые ссылки были бы великолепны!