Как обработать этот сценарий использования (данные запущенного окна) в spark - PullRequest
1 голос
/ 15 апреля 2020

Я использую spark- sql -2.4.1v с java 1.8.

Иметь исходные данные, как показано ниже:

  val df_data = Seq(
  ("Indus_1","Indus_1_Name","Country1", "State1",12789979,"2020-03-01"),
  ("Indus_1","Indus_1_Name","Country1", "State1",12789979,"2019-06-01"),
  ("Indus_1","Indus_1_Name","Country1", "State1",12789979,"2019-03-01"),

  ("Indus_2","Indus_2_Name","Country1", "State2",21789933,"2020-03-01"),
  ("Indus_2","Indus_2_Name","Country1", "State2",300789933,"2018-03-01"),

  ("Indus_3","Indus_3_Name","Country1", "State3",27989978,"2019-03-01"),
  ("Indus_3","Indus_3_Name","Country1", "State3",30014633,"2017-06-01"),
  ("Indus_3","Indus_3_Name","Country1", "State3",30014633,"2017-03-01"),

  ("Indus_4","Indus_4_Name","Country2", "State1",41789978,"2020-03-01"),
  ("Indus_4","Indus_4_Name","Country2", "State1",41789978,"2018-03-01"),

  ("Indus_5","Indus_5_Name","Country3", "State3",67789978,"2019-03-01"),
  ("Indus_5","Indus_5_Name","Country3", "State3",67789978,"2018-03-01"),
  ("Indus_5","Indus_5_Name","Country3", "State3",67789978,"2017-03-01"),

  ("Indus_6","Indus_6_Name","Country1", "State1",37899790,"2020-03-01"),
  ("Indus_6","Indus_6_Name","Country1", "State1",37899790,"2020-06-01"),
  ("Indus_6","Indus_6_Name","Country1", "State1",37899790,"2018-03-01"),

  ("Indus_7","Indus_7_Name","Country3", "State1",26689900,"2020-03-01"),
  ("Indus_7","Indus_7_Name","Country3", "State1",26689900,"2020-12-01"),
  ("Indus_7","Indus_7_Name","Country3", "State1",26689900,"2019-03-01"),

  ("Indus_8","Indus_8_Name","Country1", "State2",212359979,"2018-03-01"),
  ("Indus_8","Indus_8_Name","Country1", "State2",212359979,"2018-09-01"),
  ("Indus_8","Indus_8_Name","Country1", "State2",212359979,"2016-03-01"),

  ("Indus_9","Indus_9_Name","Country4", "State1",97899790,"2020-03-01"),
  ("Indus_9","Indus_9_Name","Country4", "State1",97899790,"2019-09-01"),
  ("Indus_9","Indus_9_Name","Country4", "State1",97899790,"2016-03-01")
  ).toDF("industry_id","industry_name","country","state","revenue","generated_date");

Запрос:

val distinct_gen_date = df_data.select("generated_date").distinct.orderBy(desc("generated_date"));

Для каждой «созданной даты» в списке Different_gen_date необходимо получить все уникальные идентификаторы industry_ids за 6 месяцев

val cols = {col("industry_id")}
 val ws = Window.partitionBy(cols).orderBy(desc("generated_date"));

val newDf = df_data
                .withColumn("rank",rank().over(ws))
                .where(col("rank").equalTo(lit(1)))
                //.drop(col("rank"))
                .select("*");

Как получить движущуюся совокупность (по уникальным идентификаторам industry_ids за 6 месяцев) ) для каждого отдельного элемента, как добиться этой движущейся агрегации.

более подробно:

Пример, в данных данных данного примера, предположим, взят из "2020- 03-01 "to" 2016-03-01 ". если в "2020-03-01" отсутствует какой-либо industry_x, необходимо проверить "2020-02-01", "2020-01-01", "2019-12-01", "2019-11-01", " 2019-10-01 "," 2019-09-01 "последовательно всякий раз, когда мы обнаруживаем, что ранг 1 принимается во внимание для этого набора данных для расчета данных" 2020-03-01 "...... мы следующие go .. "2020-02-01", т. Е. Каждая отдельная "генерируемая дата" .. для каждой отдельной даты go назад 6 месяцев получают уникальные отрасли промышленности .. выберите данные ранга 1 ... эти данные для. "2020-02-01 "... затем выберите другой отличный" generate_date "и сделайте то же самое с этим ..... здесь набор данных постоянно меняется .... используя для l oop я могу сделать, но это не дает параллемы .. как выбрать различные набор данных для каждого отдельного параллеля «генерируемая дата»?

1 Ответ

1 голос
/ 19 апреля 2020

Я не знаю, как это сделать с помощью оконных функций, но самостоятельное объединение может решить вашу проблему.

Во-первых, вам нужен DataFrame с разными датами:

val df_dates = df_data
  .select("generated_date")
  .withColumnRenamed("generated_date", "distinct_date")
  .distinct()

Далее , для каждой строки в ваших отраслевых данных вам нужно рассчитать, до какой даты эта отрасль будет включена, то есть добавить 6 месяцев к generated_date. Я думаю о них как о активных датах . Я использовал add_months () , чтобы сделать это, но вы можете подумать о другой логике.

import org.apache.spark.sql.functions.add_months
val df_active = df_data.withColumn("active_date", add_months(col("generated_date"), 6))

Если мы начнем с этих данных (разделенных датой только для наших глаз):

  industry_id     generated_date
(("Indus_1", ..., "2020-03-01"),

 ("Indus_1", ..., "2019-12-01"),
 ("Indus_2", ..., "2019-12-01"),

 ("Indus_3", ..., "2018-06-01"))

Теперь оно имеет:

  industry_id     generated_date active_date
(("Indus_1", ..., "2020-03-01", "2020-09-01"),

 ("Indus_1", ..., "2019-12-01", "2020-06-01"),
 ("Indus_2", ..., "2019-12-01", "2020-06-01")

 ("Indus_3", ..., "2018-06-01", "2018-12-01"))

Теперь приступайте к самостоятельному объединению на основе дат, используя условие объединения, соответствующее вашему 6-месячному периоду:

val condition: Column = (
  col("distinct_date") >= col("generated_date")).and(
  col("distinct_date") <= col("active_date"))

val df_joined = df_dates.join(df_active, condition, "inner")

df_joined теперь имеет:

  distinct_date industry_id     generated_date active_date
(("2020-03-01", "Indus_1", ..., "2020-03-01", "2020-09-01"),
 ("2020-03-01", "Indus_1", ..., "2019-12-01", "2020-06-01"),
 ("2020-03-01", "Indus_2", ..., "2019-12-01", "2020-06-01"),

 ("2019-12-01", "Indus_1", ..., "2019-12-01", "2020-06-01"),
 ("2019-12-01", "Indus_2", ..., "2019-12-01", "2020-06-01"),

 ("2018-06-01", "Indus_3", ..., "2018-06-01", "2018-12-01"))

Удалите этот вспомогательный столбец active_date или даже лучше, отбросьте дубликаты в зависимости от ваших потребностей:

val df_result = df_joined.dropDuplicates(Seq("distinct_date", "industry_id"))

Что отбрасывает дубликат "Indus_1" в "2020-03-01" (он появился дважды, потому что он получен из двух разных generated_date с):

  distinct_date industry_id
(("2020-03-01", "Indus_1"),
 ("2020-03-01", "Indus_2"),

 ("2019-12-01", "Indus_1"),
 ("2019-12-01", "Indus_2"),

 ("2018-06-01", "Indus_3"))
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...