Улей запрос, чтобы найти количество недель в середине - PullRequest
4 голосов
/ 10 мая 2019

У меня есть таблица, как показано ниже

id      week    count   
A100    201008  2    
A100    201009  9    
A100    201010  16    
A100    201011  23    
A100    201012  30    
A100    201013  36    
A100    201015  43    
A100    201017  50    
A100    201018  57    
A100    201019  63    
A100    201023  70    
A100    201024  82    
A100    201025  88    
A100    201026  95    
A100    201027  102

Здесь мы видим, что ниже недели отсутствуют:

  • Первый 201014 отсутствует
  • Второй 201016 отсутствует
  • Третьи недели отсутствуют, 201020, 201021, 201022

Мое требование: всякий раз, когда у нас пропущены значения, нам нужно показывать счет предыдущей недели.

В этом случае вывод должен быть:

id      week    count
A100    201008  2    
A100    201009  9    
A100    201010  16    
A100    201011  23    
A100    201012  30   
A100    201013  36    
A100    201014  36    
A100    201015  43    
A100    201016  43    
A100    201017  50    
A100    201018  57    
A100    201019  63    
A100    201020  63
A100    201021  63    
A100    201022  63    
A100    201023  70    
A100    201024  82    
A100    201025  88    
A100    201026  95    
A100    201027  102

Как мне выполнить это требование, используя hive / pyspark?

Ответы [ 2 ]

3 голосов
/ 17 мая 2019

Решение PySpark

Образцы данных

df = spark.createDataFrame([(1,201901,10),
                            (1,201903,9),
                            (1,201904,21),
                            (1,201906,42),
                            (1,201909,3),
                            (1,201912,56)
                           ],['id','weeknum','val'])
df.show()
+---+-------+---+
| id|weeknum|val|
+---+-------+---+
|  1| 201901| 10|
|  1| 201903|  9|
|  1| 201904| 21|
|  1| 201906| 42|
|  1| 201909|  3|
|  1| 201912| 56|
+---+-------+---+

1) Основная идея заключается в создании комбинации всех идентификаторов и недель (начиная с минимально возможного значения)до максимума) с cross join.

from pyspark.sql.functions import min,max,sum,when
from pyspark.sql import Window
min_max_week = df.agg(min(df.weeknum),max(df.weeknum)).collect()
#Generate all weeks using range
all_weeks = spark.range(min_max_week[0][0],min_max_week[0][1]+1)
all_weeks = all_weeks.withColumnRenamed('id','weekno')
#all_weeks.show()
id_all_weeks = df.select(df.id).distinct().crossJoin(all_weeks).withColumnRenamed('id','aid')
#id_all_weeks.show()

2) После этого left join исходный кадр данных для этих комбинаций помогает идентифицировать пропущенные значения.

res = id_all_weeks.join(df,(df.id == id_all_weeks.aid) & (df.weeknum == id_all_weeks.weekno),'left')
res.show()
+---+------+----+-------+----+
|aid|weekno|  id|weeknum| val|
+---+------+----+-------+----+
|  1|201911|null|   null|null|
|  1|201905|null|   null|null|
|  1|201903|   1| 201903|   9|
|  1|201904|   1| 201904|  21|
|  1|201901|   1| 201901|  10|
|  1|201906|   1| 201906|  42|
|  1|201908|null|   null|null|
|  1|201910|null|   null|null|
|  1|201912|   1| 201912|  56|
|  1|201907|null|   null|null|
|  1|201902|null|   null|null|
|  1|201909|   1| 201909|   3|
+---+------+----+-------+----+

3) Затем используйте комбинацию оконных функций, sum -> для назначения групп и max -> для заполнения пропущенных значений после классификации групп.

w1 = Window.partitionBy(res.aid).orderBy(res.weekno)
groups = res.withColumn("grp",sum(when(res.id.isNull(),0).otherwise(1)).over(w1))
w2 = Window.partitionBy(groups.aid,groups.grp)
missing_values_filled = groups.withColumn('filled',max(groups.val).over(w2)) #select required columns as needed
missing_values_filled.show() 

+---+------+----+-------+----+---+------+
|aid|weekno|  id|weeknum| val|grp|filled|
+---+------+----+-------+----+---+------+
|  1|201901|   1| 201901|  10|  1|    10|
|  1|201902|null|   null|null|  1|    10|
|  1|201903|   1| 201903|   9|  2|     9|
|  1|201904|   1| 201904|  21|  3|    21|
|  1|201905|null|   null|null|  3|    21|
|  1|201906|   1| 201906|  42|  4|    42|
|  1|201907|null|   null|null|  4|    42|
|  1|201908|null|   null|null|  4|    42|
|  1|201909|   1| 201909|   3|  5|     3|
|  1|201910|null|   null|null|  5|     3|
|  1|201911|null|   null|null|  5|     3|
|  1|201912|   1| 201912|  56|  6|    56|
+---+------+----+-------+----+---+------+

Запрос улья с той же логикой, что и описанная выше (при условии, что может быть создана таблица со всеми неделями)

select id,weeknum,max(val) over(partition by id,grp) as val
from (select i.id
            ,w.weeknum
            ,t.val
            ,sum(case when t.id is null then 0 else 1 end) over(partition by i.id order by w.weeknum) as grp 
      from (select distinct id from tbl) i
      cross join weeks_table w
      left join tbl t on t.id = i.id and w.weeknum = t.weeknum
     ) t
3 голосов
/ 13 мая 2019

Хотя этот ответ на Scala, версия Python будет выглядеть почти так же и может быть легко преобразована.

Шаг 1:

Найдите строки, у которых отсутствует значение недели (ей).

Пример ввода:

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

//sample input
val input = sc.parallelize(List(("A100",201008,2), ("A100",201009,9),("A100",201014,4), ("A100",201016,45))).toDF("id","week","count")

scala> input.show
+----+------+-----+
|  id|  week|count|
+----+------+-----+
|A100|201008|    2|
|A100|201009|    9|
|A100|201014|    4| //missing 4 rows
|A100|201016|   45| //missing 1 row
+----+------+-----+ 

Чтобы найти его, мы можем использовать функцию .lead() на week. И вычислите разницу между leadWeek и week. Разница не должна быть> 1, если перед ней пропущена строка.

val diffDF = input
  .withColumn("leadWeek", lead($"week", 1).over(Window.partitionBy($"id").orderBy($"week")))   // partitioning by id & computing lead()
  .withColumn("diff", ($"leadWeek" - $"week") -1)                                 // finding difference between leadWeek & week

scala> diffDF.show
+----+------+-----+--------+----+
|  id|  week|count|leadWeek|diff|
+----+------+-----+--------+----+
|A100|201008|    2|  201009|   0| // diff -> 0 represents that no rows needs to be added
|A100|201009|    9|  201014|   4| // diff -> 4 represents 4 rows are to be added after this row.
|A100|201014|    4|  201016|   1| // diff -> 1 represents 1 row to be added after this row.
|A100|201016|   45|    null|null|
+----+------+-----+--------+----+

Шаг 2:

  • Если diff>> 1: создать и добавить n строк (InputWithDiff, проверьте класс регистраций ниже), как указано в diff и увеличить week значение соответственно. Вернуть недавно созданные строки вместе с исходной строкой.
  • Если diff равен 0, никаких дополнительных вычислений не требуется. Вернуть исходную строку как есть.

Преобразование diffDF в набор данных для простоты вычислений.

case class InputWithDiff(id: Option[String], week: Option[Int], count: Option[Int], leadWeek: Option[Int], diff: Option[Int])

val diffDS = diffDF.as[InputWithDiff]

val output = diffDS.flatMap(x => {
 val diff = x.diff.getOrElse(0) 

 diff match {
  case n if n >= 1 => x :: (1 to diff).map(y => InputWithDiff(x.id, Some(x.week.get + y), x.count,x.leadWeek, x.diff)).toList  // create and append new Rows
  case _ => List(x)      // return as it is
 }
}).drop("leadWeek", "diff").toDF   // drop unnecessary columns & convert to DF

конечный результат:

scala> output.show
+----+------+-----+
|  id|  week|count|
+----+------+-----+
|A100|201008|    2|
|A100|201009|    9|
|A100|201010|    9|
|A100|201011|    9|
|A100|201012|    9|
|A100|201013|    9|
|A100|201014|    4|
|A100|201015|    4|
|A100|201016|   45|
+----+------+-----+
...