У меня есть два набора данных (фреймы данных)
- idPeersDS - у которого есть столбец идентификаторов и идентификаторы его пиров.
- infoDS - имеет два столбца типа (тип1, тип2) и столбец метрики.
-
idPeersDS
+---+---------+
| id| peers|
+---+---------+
| 1|[1, 2, 3]|
| 2|[2, 1, 6]|
| 3|[3, 1, 2]|
| 4|[4, 5, 6]|
| 5|[5, 4, 6]|
| 6|[6, 1, 2]|
+---+---------+
infoDS
+---+-----+-----+------+
| id|type1|type2|metric|
+---+-----+-----+------+
| 1| A| X| 10.0|
| 1| A| Y| 20.0|
| 1| B| X| 30.0|
| 1| B| Y| 40.0|
| 2| A| Y| 10.0|
| 2| B| X| 20.0|
| 2| B| Y| 30.0|
| 3| A| X| 40.0|
| 4| B| Y| 10.0|
| 5| A| X| 20.0|
| 5| B| X| 30.0|
| 6| A| Y| 40.0|
| 6| B| Y| 10.0|
+---+-----+-----+------+
Мне нужно рассчитать zscore метрики для каждого идентификатора, сгруппированного по type1 и type2. Но это не показатель метрик для сгруппированных данных, а показатель метрик для одноранговых узлов в группе. Если у peerId нет метрики в группе, метрика peerId обрабатывается как 0.
Пример:
для группы («A», «X») и для id = 1 равноправными узлами являются (1,2,3), метрика для zscore будет (10, 0, 40); Так как id = 2 не существует в группе («A», «X»), оно равно 0. id = 5 не является равным id = 1, поэтому оно не является частью вычисления zscore.
+---+------+---------+-----------+
| id|metric| peers|type1|type2|
+---+------+---------+-----------+
| 1| 10.0|[1, 2, 3]| A| X|
| 3| 40.0|[3, 1, 2]| A| X|
| 5| 20.0|[5, 4, 6]| A| X|
Z = (X - μ) / σ
Z = (10 - 16.66666) / 16.99673
Z = -0.39223
Output should be the following table. I can compute zscore if `peersmetrics` column instead of `zScoreValue` column like my code did.
+---+------+---------+-----------+-----+-----+
| id|metric| peers|zScoreValue|type1|type2| peersmetrics
+---+------+---------+-----------+-----+-----+
| 1| 10.0|[1, 2, 3]| -0.39| A| X| [10, 0, 40]
| 3| 40.0|[3, 1, 2]| 1.37| A| X| [40, 10, 0]
| 5| 20.0|[5, 4, 6]| 1.41| A| X| [20, 0 , 0]
| 1| 40.0|[1, 2, 3]| 0.98| B| Y| [40, 30, 0]
| 2| 30.0|[2, 1, 6]| 0.27| B| Y| [30, 40, 10]
| 4| 10.0|[4, 5, 6]| 0.71| B| Y|
| 6| 10.0|[6, 1, 2]| -1.34| B| Y|
| 1| 30.0|[1, 2, 3]| 1.07| B| X|
| 2| 20.0|[2, 1, 6]| 0.27| B| X|
| 5| 30.0|[5, 4, 6]| 1.41| B| X|
| 1| 20.0|[1, 2, 3]| 1.22| A| Y|
| 2| 10.0|[2, 1, 6]| -1.07| A| Y|
| 6| 40.0|[6, 1, 2]| 1.34| A| Y|
+---+------+---------+-----------+-----+-----+
Edit1: решение SQL одинаково ценится. Я могу преобразовать SQL в Scala-код в моей искровой работе.
Следующее - мое решение, но вычисление занимает больше времени, чем я хотел.
размер истинных наборов данных:
У idPeersDS 17000, а у infoDS 17000 * 6 * 15
Любое другое решение очень ценится. Не стесняйтесь редактировать / рекомендовать название и правильную грамматику. Английский не мой родной язык. Благодарю.
Вот мой код.
val idPeersDS = Seq(
(1, Seq(1,2,3)),
(2, Seq(2,1,6)),
(3, Seq(3,1,2)),
(4, Seq(4,5,6)),
(5, Seq(5,4,6)),
(6, Seq(6,1,2))
).toDS.select($"_1" as "id", $"_2" as "peers")
val infoDS = Seq(
(1, "A", "X", 10),
(1, "A", "Y", 20),
(1, "B", "X", 30),
(1, "B", "Y", 40),
(2, "A", "Y", 10),
(2, "B", "X", 20),
(2, "B", "Y", 30),
(3, "A", "X", 40),
(4, "B", "Y", 10),
(5, "A", "X", 20),
(5, "B", "X", 30),
(6, "A", "Y", 40),
(6, "B", "Y", 10)
).toDS.select($"_1" as "id", $"_2" as "type1", $"_3" as "type2", $"_4" cast "double" as "metric")
def calculateZScoreGivenPeers(idMetricDS: DataFrame, irPeersDS: DataFrame, roundTo: Int = 2)
(implicit spark: SparkSession): DataFrame = {
import spark.implicits._
// for every id in the idMetricDS, get the peers and their metric for zscore, calculate zscore
val fir = idMetricDS.join(irPeersDS, "id")
val fsMapBroadcast = spark.sparkContext.broadcast(
idMetricDS.toDF.map((r: Row) => {r.getInt(0) -> r.getDouble(1)}).rdd.collectAsMap)
val fsMap = fsMapBroadcast.value
val funUdf = udf((currId: Int, xs: WrappedArray[Int]) => {
val zScoreMetrics: Array[Double] = xs.toArray.map(x => fsMap.getOrElse(x, 0.0))
val ds = new DescriptiveStatistics(zScoreMetrics)
val mean = ds.getMean()
val sd = Math.sqrt(ds.getPopulationVariance())
val zScore = if (sd == 0.0) {0.0} else {(fsMap.getOrElse(currId, 0.0)- mean) / sd}
zScore
})
val idStatsWithZscoreDS =
fir.withColumn("zScoreValue", round(funUdf(fir("id"), fir("peers")), roundTo))
fsMapBroadcast.unpersist
fsMapBroadcast.destroy
return idStatsWithZscoreDS
}
val typesComb = infoDS.select("type1", "type2").dropDuplicates.collect
val zScoreDS = typesComb.map(
ept => {
val et = ept.getString(0)
val pt = ept.getString(1)
val idMetricDS = infoDS.where($"type1" === lit(et) && $"type2" === lit(pt)).select($"id", $"metric")
val zScoreDS = calculateZScoreGivenPeers(idMetricDS, idPeersDS)(spark)
zScoreDS.select($"id", $"metric", $"peers", $"zScoreValue").withColumn("type1", lit(et)).withColumn("type2", lit(pt))
}
).reduce(_.union(_))
scala> idPeersDS.show(100)
+---+---------+
| id| peers|
+---+---------+
| 1|[1, 2, 3]|
| 2|[2, 1, 6]|
| 3|[3, 1, 2]|
| 4|[4, 5, 6]|
| 5|[5, 4, 6]|
| 6|[6, 1, 2]|
+---+---------+
scala> infoDS.show(100)
+---+-----+-----+------+
| id|type1|type2|metric|
+---+-----+-----+------+
| 1| A| X| 10.0|
| 1| A| Y| 20.0|
| 1| B| X| 30.0|
| 1| B| Y| 40.0|
| 2| A| Y| 10.0|
| 2| B| X| 20.0|
| 2| B| Y| 30.0|
| 3| A| X| 40.0|
| 4| B| Y| 10.0|
| 5| A| X| 20.0|
| 5| B| X| 30.0|
| 6| A| Y| 40.0|
| 6| B| Y| 10.0|
+---+-----+-----+------+
scala> typesComb
res3: Array[org.apache.spark.sql.Row] = Array([A,X], [B,Y], [B,X], [A,Y])
scala> zScoreDS.show(100)
+---+------+---------+-----------+-----+-----+
| id|metric| peers|zScoreValue|type1|type2|
+---+------+---------+-----------+-----+-----+
| 1| 10.0|[1, 2, 3]| -0.39| A| X|
| 3| 40.0|[3, 1, 2]| 1.37| A| X|
| 5| 20.0|[5, 4, 6]| 1.41| A| X|
| 1| 40.0|[1, 2, 3]| 0.98| B| Y|
| 2| 30.0|[2, 1, 6]| 0.27| B| Y|
| 4| 10.0|[4, 5, 6]| 0.71| B| Y|
| 6| 10.0|[6, 1, 2]| -1.34| B| Y|
| 1| 30.0|[1, 2, 3]| 1.07| B| X|
| 2| 20.0|[2, 1, 6]| 0.27| B| X|
| 5| 30.0|[5, 4, 6]| 1.41| B| X|
| 1| 20.0|[1, 2, 3]| 1.22| A| Y|
| 2| 10.0|[2, 1, 6]| -1.07| A| Y|
| 6| 40.0|[6, 1, 2]| 1.34| A| Y|
+---+------+---------+-----------+-----+-----+