Я использую spark- sql -2.4.1v с java 1.8.
Иметь исходные данные, как показано ниже:
val df_data = Seq(
("Indus_1","Indus_1_Name","Country1", "State1",12789979,"2020-03-01"),
("Indus_1","Indus_1_Name","Country1", "State1",12789979,"2019-06-01"),
("Indus_1","Indus_1_Name","Country1", "State1",12789979,"2019-03-01"),
("Indus_2","Indus_2_Name","Country1", "State2",21789933,"2020-03-01"),
("Indus_2","Indus_2_Name","Country1", "State2",300789933,"2018-03-01"),
("Indus_3","Indus_3_Name","Country1", "State3",27989978,"2019-03-01"),
("Indus_3","Indus_3_Name","Country1", "State3",30014633,"2017-06-01"),
("Indus_3","Indus_3_Name","Country1", "State3",30014633,"2017-03-01"),
("Indus_4","Indus_4_Name","Country2", "State1",41789978,"2020-03-01"),
("Indus_4","Indus_4_Name","Country2", "State1",41789978,"2018-03-01"),
("Indus_5","Indus_5_Name","Country3", "State3",67789978,"2019-03-01"),
("Indus_5","Indus_5_Name","Country3", "State3",67789978,"2018-03-01"),
("Indus_5","Indus_5_Name","Country3", "State3",67789978,"2017-03-01"),
("Indus_6","Indus_6_Name","Country1", "State1",37899790,"2020-03-01"),
("Indus_6","Indus_6_Name","Country1", "State1",37899790,"2020-06-01"),
("Indus_6","Indus_6_Name","Country1", "State1",37899790,"2018-03-01"),
("Indus_7","Indus_7_Name","Country3", "State1",26689900,"2020-03-01"),
("Indus_7","Indus_7_Name","Country3", "State1",26689900,"2020-12-01"),
("Indus_7","Indus_7_Name","Country3", "State1",26689900,"2019-03-01"),
("Indus_8","Indus_8_Name","Country1", "State2",212359979,"2018-03-01"),
("Indus_8","Indus_8_Name","Country1", "State2",212359979,"2018-09-01"),
("Indus_8","Indus_8_Name","Country1", "State2",212359979,"2016-03-01"),
("Indus_9","Indus_9_Name","Country4", "State1",97899790,"2020-03-01"),
("Indus_9","Indus_9_Name","Country4", "State1",97899790,"2019-09-01"),
("Indus_9","Indus_9_Name","Country4", "State1",97899790,"2016-03-01")
).toDF("industry_id","industry_name","country","state","revenue","generated_date");
Запрос:
val distinct_gen_date = df_data.select("generated_date").distinct.orderBy(desc("generated_date"));
Для каждой «созданной даты» в списке Different_gen_date необходимо получить все уникальные идентификаторы industry_ids за 6 месяцев
val cols = {col("industry_id")}
val ws = Window.partitionBy(cols).orderBy(desc("generated_date"));
val newDf = df_data
.withColumn("rank",rank().over(ws))
.where(col("rank").equalTo(lit(1)))
//.drop(col("rank"))
.select("*");
Как получить движущуюся совокупность (по уникальным идентификаторам industry_ids за 6 месяцев) ) для каждого отдельного элемента, как добиться этой движущейся агрегации.
более подробно:
Пример, в данных данных данного примера, предположим, взят из "2020- 03-01 "to" 2016-03-01 ". если в "2020-03-01" отсутствует какой-либо industry_x, необходимо проверить "2020-02-01", "2020-01-01", "2019-12-01", "2019-11-01", " 2019-10-01 "," 2019-09-01 "последовательно всякий раз, когда мы обнаруживаем, что ранг 1 принимается во внимание для этого набора данных для расчета данных" 2020-03-01 "...... мы следующие go .. "2020-02-01", т. Е. Каждая отдельная "генерируемая дата" .. для каждой отдельной даты go назад 6 месяцев получают уникальные отрасли промышленности .. выберите данные ранга 1 ... эти данные для. "2020-02-01 "... затем выберите другой отличный" generate_date "и сделайте то же самое с этим ..... здесь набор данных постоянно меняется .... используя для l oop я могу сделать, но это не дает параллемы .. как выбрать различные набор данных для каждого отдельного параллеля «генерируемая дата»?