Я сделал это со SCALA, так что вам нужно будет конвертировать, но я думаю, что это намного проще. Я добавил ключ и сделал на ключевом уровне, вы можете адаптировать и усвоить это. Но принцип намного проще. Не требуется коррелированных подзапросов. Просто реляционное исчисление. Используемый номер для дат и т. Д.
// SCALA
// Slightly ambiguous on hols vs. weekend, as you stated treated as 1
import spark.implicits._
import org.apache.spark.sql.functions._
val dfE = Seq(
("NIC", 1, false, false),
("NIC", 2, false, false),
("NIC", 3, true, false),
("NIC", 4, true, true),
("NIC", 5, false, false),
("NIC", 6, false, false),
("XYZ", 1, false, true)
).toDF("e","d","w", "h")
//dfE.show(false)
val dfE2 = dfE.withColumn("wh", when ($"w" or $"h", 1) otherwise (0)).drop("w").drop("h")
//dfE2.show()
//Assuming more dfD's can exist
val dfD = Seq(
("NIC", 1, 4, "k1"),
("NIC", 2, 3, "k2"),
("NIC", 1, 1, "k3"),
("NIC", 7, 10, "k4")
).toDF("e","pd","dd", "k")
//dfD.show(false)
dfE2.createOrReplaceTempView("E2")
dfD.createOrReplaceTempView("D1")
// This done per record, if over identical keys, then strip k and aggr otherwise, I added k for checking each entry
// Point is it is far easier. Key means synthetic grouping by.
val q=sqlContext.sql(""" SELECT d1.k, d1.e, d1.pd, d1.dd, sum(e2.wh)
FROM D1, E2
WHERE D1.e = E2.e
AND E2.d >= D1.pd
AND E2.d <= D1.dd
GROUP BY d1.k, d1.e, d1.pd, d1.dd
ORDER BY d1.k, d1.e, d1.pd, d1.dd
""")
q.show
возвращается:
+---+---+---+---+-------+
| k| e| pd| dd|sum(wh)|
+---+---+---+---+-------+
| k1|NIC| 1| 4| 2|
| k2|NIC| 2| 3| 1|
| k3|NIC| 1| 1| 0|
+---+---+---+---+-------+
Я думаю, что простое улучшение производительности может быть сделано. На самом деле никаких взаимосвязанных вещей не требуется.
Можно использовать AND E2.d МЕЖДУ D1.pd И D1.dd, если хотите.