Вот то, что я считаю частью решения, касающегося ваших неотложных вопросов, но я оставляю аспекты для вас, чтобы заполнить их Есть и другие подходы, но это мой быстрый вывод из того, что я понимаю. Успех. Нет foreach req'd. Возможно, я неправильно понял, что вам нужно. Извините, если это так. При таком подходе вы можете подумать о .cache.
// Assuming constant names in terms of country names are spelled similarly and consistently
// Not clear if by date or for selected dates. If selected dates then use another list
// This approach will scale due to JOIN and AGG and no foreach, etc.
// Spark will fuse the code together if it can, but there are shuffles
// This is for Country, State. You can apply the approach to just Country and then UNION the 2 DF's with common names and definitions. Try it out
// NB: You make a custom grouping by concatenating the Country & State or you can leave as is, and for 2nd query you can just fill in country and put a blank value into the State.
// I leave that up to you.
import spark.implicits._
import org.apache.spark.sql.functions._
val dfC = Seq(("USA", "Ohio"), ("NZ", "Otago")).toDF("sCountry", "sState") // Your search criteria at Country / State level, cannot so simple .isin - why?
val d = List("23-10-2001", "12-12-2003") // or Array
val dfS = Seq(
("USA", "Ohio", "23-10-2001", 2),
("USA", "Ohio", "23-10-2001", 2),
("USA", "Ohio", "23-10-2011", 2),
("USA", "Texas", "23-10-2001", 2),
("USA", "Virgina", "23-10-2001", 10),
("USA", "Virgina", "23-10-2001", 6),
("USA", "vanDiemensLand", "23-10-2001", 26),
("NL", "vanDiemensLand", "23-10-2001", 16),
("UK", "Middlesex", "23-10-2001", 3)
).toDF("country", "state", "date", "some_val")
dfS.show(false)
// 1. For Country & State
// JOIN acts as a filter as is inner join and alleviates the .isin for multiple cols i.e. Country||State
val df1 = dfS.join(dfC, (dfS("country") === dfC("sCountry")) && (dfS("state") === dfC("sState"))).drop("sCountry").drop("sState")
df1.show(false)
val df2 = df1.filter($"date".isin(d:_*)).groupBy("country", "state").avg("some_val")
df2.show(false)
// 2. For Country only
... to fill in by you
...
// 3. UNION df2 & df3
...
// 4. Save with partitioning.