Pyspark: как получить текущий результат и 30-дневную среднюю оценку в одном ряду - PullRequest
0 голосов
/ 13 апреля 2019

У меня есть сценарий использования, в котором я хочу получить рейтинг за сегодня, а также среднее значение за 30 дней в виде столбца.Данные имеют данные за 30 дней для определенного идентификатора и типа.Данные выглядят следующим образом: -

  Id     Type        checkInDate      avgrank
   1     ALONE       2019-04-24      1.333333
   1     ALONE       2019-03-31      34.057471
   2     ALONE       2019-04-17      1.660842
   1     TOGETHER    2019-04-13      19.500000
   1     TOGETHER    2019-04-08      5.481203
   2     ALONE       2019-03-29      122.449156
   3     ALONE       2019-04-07      3.375000
   1     TOGETHER    2019-04-01      49.179719
   5     TOGETHER    2019-04-17      1.391753
   2     ALONE       2019-04-22      3.916667
   1     ALONE       2019-04-15      2.459151

В качестве результата я хочу получить вывод типа

  Id     Type        TodayAvg        30DayAvg
   1     ALONE       30.0            9.333333
   1     TOGETHER    1.0             34.057471
   2     ALONE       7.8             99.660842
   2     TOGETHER    3               19.500000

..

Я думаю, что я могу добиться этого, имея 2 кадра данных, один из которых выполняет фильтрацию по сегодняшней дате, а второй - в среднем в течение 30 дней, а затем объединяет сегодняшние кадры данных по идентификатору и типу

.
rank = glueContext.create_dynamic_frame.from_catalog(database="testing", table_name="rank", transformation_ctx="rank")

filtert_rank = Filter.apply(frame=rank, f=lambda x: (x["checkInDate"] == curr_dt))

rank_avg = glueContext.create_dynamic_frame.from_catalog(database="testing", table_name="rank", transformation_ctx="rank_avg")

rank_avg_f = rank_avg.groupBy("id", "type").agg(F.mean("avgrank"))

rank_join = filtert_rank.join(rank_avg, ["id", "type"], how='inner')

Есть ли более простой способ сделать это, т. Е. Без чтения кадра данных дважды?

1 Ответ

0 голосов
/ 07 мая 2019

Вы можете преобразовать динамический фрейм в фрейм данных Apache Spark и выполнить обычный sql.

Проверьте документацию для toDF () и sparksql.

...