Spark, агрегация SQL на основе второго набора данных - PullRequest
0 голосов
/ 12 марта 2019

У меня есть два набора данных (фреймы данных)

  1. idPeersDS - у которого есть столбец идентификаторов и идентификаторы его пиров.
  2. infoDS - имеет два столбца типа (тип1, тип2) и столбец метрики.

-

idPeersDS
+---+---------+
| id|    peers|
+---+---------+
|  1|[1, 2, 3]|
|  2|[2, 1, 6]|
|  3|[3, 1, 2]|
|  4|[4, 5, 6]|
|  5|[5, 4, 6]|
|  6|[6, 1, 2]|
+---+---------+


infoDS
+---+-----+-----+------+
| id|type1|type2|metric|
+---+-----+-----+------+
|  1|    A|    X|  10.0|
|  1|    A|    Y|  20.0|
|  1|    B|    X|  30.0|
|  1|    B|    Y|  40.0|
|  2|    A|    Y|  10.0|
|  2|    B|    X|  20.0|
|  2|    B|    Y|  30.0|
|  3|    A|    X|  40.0|
|  4|    B|    Y|  10.0|
|  5|    A|    X|  20.0|
|  5|    B|    X|  30.0|
|  6|    A|    Y|  40.0|
|  6|    B|    Y|  10.0|
+---+-----+-----+------+

Мне нужно рассчитать zscore метрики для каждого идентификатора, сгруппированного по type1 и type2. Но это не показатель метрик для сгруппированных данных, а показатель метрик для одноранговых узлов в группе. Если у peerId нет метрики в группе, метрика peerId обрабатывается как 0.

Пример: для группы («A», «X») и для id = 1 равноправными узлами являются (1,2,3), метрика для zscore будет (10, 0, 40); Так как id = 2 не существует в группе («A», «X»), оно равно 0. id = 5 не является равным id = 1, поэтому оно не является частью вычисления zscore.

+---+------+---------+-----------+
| id|metric|    peers|type1|type2|
+---+------+---------+-----------+
|  1|  10.0|[1, 2, 3]|    A|    X|
|  3|  40.0|[3, 1, 2]|    A|    X|
|  5|  20.0|[5, 4, 6]|    A|    X|
Z = (X - μ) / σ
Z = (10 - 16.66666) / 16.99673

Z = -0.39223

 Output should be the following table. I can compute zscore if `peersmetrics` column instead of `zScoreValue` column like my code did.
    +---+------+---------+-----------+-----+-----+
    | id|metric|    peers|zScoreValue|type1|type2|    peersmetrics
    +---+------+---------+-----------+-----+-----+
    |  1|  10.0|[1, 2, 3]|      -0.39|    A|    X|    [10, 0, 40]
    |  3|  40.0|[3, 1, 2]|       1.37|    A|    X|    [40, 10, 0]
    |  5|  20.0|[5, 4, 6]|       1.41|    A|    X|    [20, 0 , 0]
    |  1|  40.0|[1, 2, 3]|       0.98|    B|    Y|    [40, 30, 0]
    |  2|  30.0|[2, 1, 6]|       0.27|    B|    Y|    [30, 40, 10]
    |  4|  10.0|[4, 5, 6]|       0.71|    B|    Y|
    |  6|  10.0|[6, 1, 2]|      -1.34|    B|    Y|
    |  1|  30.0|[1, 2, 3]|       1.07|    B|    X|
    |  2|  20.0|[2, 1, 6]|       0.27|    B|    X|
    |  5|  30.0|[5, 4, 6]|       1.41|    B|    X|
    |  1|  20.0|[1, 2, 3]|       1.22|    A|    Y|
    |  2|  10.0|[2, 1, 6]|      -1.07|    A|    Y|
    |  6|  40.0|[6, 1, 2]|       1.34|    A|    Y|
    +---+------+---------+-----------+-----+-----+

Edit1: решение SQL одинаково ценится. Я могу преобразовать SQL в Scala-код в моей искровой работе.

Следующее - мое решение, но вычисление занимает больше времени, чем я хотел. размер истинных наборов данных: У idPeersDS 17000, а у infoDS 17000 * 6 * 15

Любое другое решение очень ценится. Не стесняйтесь редактировать / рекомендовать название и правильную грамматику. Английский не мой родной язык. Благодарю.

Вот мой код.

val idPeersDS = Seq(
  (1, Seq(1,2,3)),
  (2, Seq(2,1,6)),
  (3, Seq(3,1,2)),
  (4, Seq(4,5,6)),
  (5, Seq(5,4,6)),
  (6, Seq(6,1,2))
).toDS.select($"_1" as "id", $"_2" as "peers")

val infoDS = Seq(
  (1, "A", "X", 10),
  (1, "A", "Y", 20),
  (1, "B", "X", 30),
  (1, "B", "Y", 40),
  (2, "A", "Y", 10),
  (2, "B", "X", 20),
  (2, "B", "Y", 30),
  (3, "A", "X", 40),
  (4, "B", "Y", 10),
  (5, "A", "X", 20),
  (5, "B", "X", 30),
  (6, "A", "Y", 40),
  (6, "B", "Y", 10)
).toDS.select($"_1" as "id", $"_2" as "type1", $"_3" as "type2", $"_4" cast "double" as "metric")




def calculateZScoreGivenPeers(idMetricDS: DataFrame, irPeersDS: DataFrame, roundTo: Int = 2)
(implicit spark: SparkSession): DataFrame = {

  import spark.implicits._

  // for every id in the idMetricDS, get the peers and their metric for zscore, calculate zscore
  val fir = idMetricDS.join(irPeersDS, "id")
  val fsMapBroadcast = spark.sparkContext.broadcast(
    idMetricDS.toDF.map((r: Row) => {r.getInt(0) -> r.getDouble(1)}).rdd.collectAsMap)
  val fsMap = fsMapBroadcast.value
  val funUdf = udf((currId: Int, xs: WrappedArray[Int]) => {
    val zScoreMetrics: Array[Double] = xs.toArray.map(x => fsMap.getOrElse(x, 0.0))
    val ds = new DescriptiveStatistics(zScoreMetrics)
    val mean = ds.getMean()
    val sd = Math.sqrt(ds.getPopulationVariance())
    val zScore = if (sd == 0.0) {0.0} else {(fsMap.getOrElse(currId, 0.0)- mean) / sd}
    zScore
  })

  val idStatsWithZscoreDS =
    fir.withColumn("zScoreValue", round(funUdf(fir("id"), fir("peers")), roundTo))
  fsMapBroadcast.unpersist
  fsMapBroadcast.destroy
  return idStatsWithZscoreDS

}

val typesComb = infoDS.select("type1", "type2").dropDuplicates.collect

val zScoreDS = typesComb.map(
  ept => {
    val et = ept.getString(0)
    val pt = ept.getString(1)
    val idMetricDS = infoDS.where($"type1" === lit(et) && $"type2" === lit(pt)).select($"id", $"metric")
    val zScoreDS = calculateZScoreGivenPeers(idMetricDS, idPeersDS)(spark)
    zScoreDS.select($"id", $"metric", $"peers", $"zScoreValue").withColumn("type1", lit(et)).withColumn("type2", lit(pt))
  }
).reduce(_.union(_))


scala> idPeersDS.show(100)
+---+---------+
| id|    peers|
+---+---------+
|  1|[1, 2, 3]|
|  2|[2, 1, 6]|
|  3|[3, 1, 2]|
|  4|[4, 5, 6]|
|  5|[5, 4, 6]|
|  6|[6, 1, 2]|
+---+---------+


scala> infoDS.show(100)
+---+-----+-----+------+
| id|type1|type2|metric|
+---+-----+-----+------+
|  1|    A|    X|  10.0|
|  1|    A|    Y|  20.0|
|  1|    B|    X|  30.0|
|  1|    B|    Y|  40.0|
|  2|    A|    Y|  10.0|
|  2|    B|    X|  20.0|
|  2|    B|    Y|  30.0|
|  3|    A|    X|  40.0|
|  4|    B|    Y|  10.0|
|  5|    A|    X|  20.0|
|  5|    B|    X|  30.0|
|  6|    A|    Y|  40.0|
|  6|    B|    Y|  10.0|
+---+-----+-----+------+


scala> typesComb
res3: Array[org.apache.spark.sql.Row] = Array([A,X], [B,Y], [B,X], [A,Y])

scala> zScoreDS.show(100)
+---+------+---------+-----------+-----+-----+
| id|metric|    peers|zScoreValue|type1|type2|
+---+------+---------+-----------+-----+-----+
|  1|  10.0|[1, 2, 3]|      -0.39|    A|    X|
|  3|  40.0|[3, 1, 2]|       1.37|    A|    X|
|  5|  20.0|[5, 4, 6]|       1.41|    A|    X|
|  1|  40.0|[1, 2, 3]|       0.98|    B|    Y|
|  2|  30.0|[2, 1, 6]|       0.27|    B|    Y|
|  4|  10.0|[4, 5, 6]|       0.71|    B|    Y|
|  6|  10.0|[6, 1, 2]|      -1.34|    B|    Y|
|  1|  30.0|[1, 2, 3]|       1.07|    B|    X|
|  2|  20.0|[2, 1, 6]|       0.27|    B|    X|
|  5|  30.0|[5, 4, 6]|       1.41|    B|    X|
|  1|  20.0|[1, 2, 3]|       1.22|    A|    Y|
|  2|  10.0|[2, 1, 6]|      -1.07|    A|    Y|
|  6|  40.0|[6, 1, 2]|       1.34|    A|    Y|
+---+------+---------+-----------+-----+-----+

1 Ответ

1 голос
/ 14 марта 2019

Я решил это. Вот мой ответ. Это решение работало значительно быстрее (<1/10-е), чем мое предыдущее решение, которое у меня было в вопросе о моих реальных наборах данных. Я избегал сбора данных для водителя и карты и объединения наборов данных в уменьшении. </p>

val idPeersDS = Seq(
  (1, Seq(1,2,3)),
  (2, Seq(2,1,6)),
  (3, Seq(3,1,2)),
  (4, Seq(4,5,6)),
  (5, Seq(5,4,6)),
  (6, Seq(6,1,2))
).toDS.select($"_1" as "id", $"_2" as "peers")

val infoDS = Seq(
  (1, "A", "X", 10),
  (1, "A", "Y", 20),
  (1, "B", "X", 30),
  (1, "B", "Y", 40),
  (2, "A", "Y", 10),
  (2, "B", "X", 20),
  (2, "B", "Y", 30),
  (3, "A", "X", 40),
  (4, "B", "Y", 10),
  (5, "A", "X", 20),
  (5, "B", "X", 30),
  (6, "A", "Y", 40),
  (6, "B", "Y", 10)
).toDS.select($"_1" as "id", $"_2" as "type1", $"_3" as "type2", $"_4" cast "double" as "metric")


// Exiting paste mode, now interpreting.

idPeersDS: org.apache.spark.sql.DataFrame = [id: int, peers: array<int>]
infoDS: org.apache.spark.sql.DataFrame = [id: int, type1: string ... 2 more fields]

scala> idPeersDS.show
+---+---------+
| id|    peers|
+---+---------+
|  1|[1, 2, 3]|
|  2|[2, 1, 6]|
|  3|[3, 1, 2]|
|  4|[4, 5, 6]|
|  5|[5, 4, 6]|
|  6|[6, 1, 2]|
+---+---------+


scala> infoDS.show
+---+-----+-----+------+
| id|type1|type2|metric|
+---+-----+-----+------+
|  1|    A|    X|  10.0|
|  1|    A|    Y|  20.0|
|  1|    B|    X|  30.0|
|  1|    B|    Y|  40.0|
|  2|    A|    Y|  10.0|
|  2|    B|    X|  20.0|
|  2|    B|    Y|  30.0|
|  3|    A|    X|  40.0|
|  4|    B|    Y|  10.0|
|  5|    A|    X|  20.0|
|  5|    B|    X|  30.0|
|  6|    A|    Y|  40.0|
|  6|    B|    Y|  10.0|
+---+-----+-----+------+


scala> val infowithpeers = infoDS.join(idPeersDS, "id")
infowithpeers: org.apache.spark.sql.DataFrame = [id: int, type1: string ... 3 more fields]

scala> infowithpeers.show
+---+-----+-----+------+---------+
| id|type1|type2|metric|    peers|
+---+-----+-----+------+---------+
|  1|    A|    X|  10.0|[1, 2, 3]|
|  1|    A|    Y|  20.0|[1, 2, 3]|
|  1|    B|    X|  30.0|[1, 2, 3]|
|  1|    B|    Y|  40.0|[1, 2, 3]|
|  2|    A|    Y|  10.0|[2, 1, 6]|
|  2|    B|    X|  20.0|[2, 1, 6]|
|  2|    B|    Y|  30.0|[2, 1, 6]|
|  3|    A|    X|  40.0|[3, 1, 2]|
|  4|    B|    Y|  10.0|[4, 5, 6]|
|  5|    A|    X|  20.0|[5, 4, 6]|
|  5|    B|    X|  30.0|[5, 4, 6]|
|  6|    A|    Y|  40.0|[6, 1, 2]|
|  6|    B|    Y|  10.0|[6, 1, 2]|
+---+-----+-----+------+---------+


scala> val joinMap = udf { values: Seq[Map[Int,Double]] => values.flatten.toMap }
joinMap: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,MapType(IntegerType,DoubleType,false),Some(List(ArrayType(MapType(IntegerType,DoubleType,false),true))))

scala> val zScoreCal = udf { (metric: Double, zScoreMetrics: WrappedArray[Double]) =>
    |   val ds = new DescriptiveStatistics(zScoreMetrics.toArray)
    |   val mean = ds.getMean()
    |   val sd = Math.sqrt(ds.getPopulationVariance())
    |   val zScore = if (sd == 0.0) {0.0} else {(metric - mean) / sd}
    |   zScore
    | }
zScoreCal: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function2>,DoubleType,Some(List(DoubleType, ArrayType(DoubleType,false))))

scala> :paste
// Entering paste mode (ctrl-D to finish)


val infowithpeersidmetric = infowithpeers.withColumn("idmetric", map($"id",$"metric"))
val idsingrpdf = infowithpeersidmetric.groupBy("type1","type2").agg(joinMap(collect_list(map($"id", $"metric"))) as "idsingrp")

val metricsMap = udf { (peers: Seq[Int], values: Map[Int,Double]) => {
    peers.map(p => values.getOrElse(p,0.0))
  }
}

// Exiting paste mode, now interpreting.

infowithpeersidmetric: org.apache.spark.sql.DataFrame = [id: int, type1: string ... 4 more fields]
idsingrpdf: org.apache.spark.sql.DataFrame = [type1: string, type2: string ... 1 more field]
metricsMap: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function2>,ArrayType(DoubleType,false),Some(List(ArrayType(IntegerType,false), MapType(IntegerType,DoubleType,false))))

scala> val infoWithMap = infowithpeers.join(idsingrpdf, Seq("type1","type2")).withColumn("zScoreMetrics", metricsMap($"peers", $"idsingrp")).withColumn("zscore", round(zScoreCal($"metric",$"zScoreMetrics"),2))
infoWithMap: org.apache.spark.sql.DataFrame = [type1: string, type2: string ... 6 more fields]

scala> infoWithMap.show
+-----+-----+---+------+---------+--------------------+------------------+------+
|type1|type2| id|metric|    peers|            idsingrp|     zScoreMetrics|zscore|
+-----+-----+---+------+---------+--------------------+------------------+------+
|    A|    X|  1|  10.0|[1, 2, 3]|[3 -> 40.0, 5 -> ...| [10.0, 0.0, 40.0]| -0.39|
|    A|    Y|  1|  20.0|[1, 2, 3]|[2 -> 10.0, 6 -> ...| [20.0, 10.0, 0.0]|  1.22|
|    B|    X|  1|  30.0|[1, 2, 3]|[1 -> 30.0, 2 -> ...| [30.0, 20.0, 0.0]|  1.07|
|    B|    Y|  1|  40.0|[1, 2, 3]|[4 -> 10.0, 1 -> ...| [40.0, 30.0, 0.0]|  0.98|
|    A|    Y|  2|  10.0|[2, 1, 6]|[2 -> 10.0, 6 -> ...|[10.0, 20.0, 40.0]| -1.07|
|    B|    X|  2|  20.0|[2, 1, 6]|[1 -> 30.0, 2 -> ...| [20.0, 30.0, 0.0]|  0.27|
|    B|    Y|  2|  30.0|[2, 1, 6]|[4 -> 10.0, 1 -> ...|[30.0, 40.0, 10.0]|  0.27|
|    A|    X|  3|  40.0|[3, 1, 2]|[3 -> 40.0, 5 -> ...| [40.0, 10.0, 0.0]|  1.37|
|    B|    Y|  4|  10.0|[4, 5, 6]|[4 -> 10.0, 1 -> ...| [10.0, 0.0, 10.0]|  0.71|
|    A|    X|  5|  20.0|[5, 4, 6]|[3 -> 40.0, 5 -> ...|  [20.0, 0.0, 0.0]|  1.41|
|    B|    X|  5|  30.0|[5, 4, 6]|[1 -> 30.0, 2 -> ...|  [30.0, 0.0, 0.0]|  1.41|
|    A|    Y|  6|  40.0|[6, 1, 2]|[2 -> 10.0, 6 -> ...|[40.0, 20.0, 10.0]|  1.34|
|    B|    Y|  6|  10.0|[6, 1, 2]|[4 -> 10.0, 1 -> ...|[10.0, 40.0, 30.0]| -1.34|
+-----+-----+---+------+---------+--------------------+------------------+------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...