Переслать Заполнить новую строку, чтобы учесть пропущенные даты - PullRequest
1 голос
/ 26 марта 2019

В настоящее время у меня есть набор данных, сгруппированный по часам с помощью переменной «агрегатор». В этих почасовых данных есть пробелы, и в идеале я бы хотел, чтобы они заполнили строки предыдущей строкой, которая сопоставляется с переменной в столбце x.

Я видел несколько решений подобных проблем с использованием PANDAS, но в идеале я хотел бы понять, как лучше всего подойти к этому с UDF для pyspark.

Первоначально я думал о чем-то вроде следующего с PANDAS, но также изо всех сил пытался реализовать это, чтобы просто заполнить игнорируя агрегатор как первый проход:

df = df.set_index(keys=[df.timestamp]).resample('1H', fill_method='ffill')

Но в идеале я бы хотел избежать использования PANDAS.

В приведенном ниже примере у меня есть две пропущенные строки почасовых данных (помечены как MISSING).

| timestamp            | aggregator |
|----------------------|------------|
| 2018-12-27T09:00:00Z | A          |
| 2018-12-27T10:00:00Z | A          |
| MISSING              | MISSING    |
| 2018-12-27T12:00:00Z | A          |
| 2018-12-27T13:00:00Z | A          |
| 2018-12-27T09:00:00Z | B          |
| 2018-12-27T10:00:00Z | B          |
| 2018-12-27T11:00:00Z | B          |
| MISSING              | MISSING    |
| 2018-12-27T13:00:00Z | B          |
| 2018-12-27T14:00:00Z | B          |

Ожидаемый результат здесь будет следующим:

| timestamp            | aggregator |
|----------------------|------------|
| 2018-12-27T09:00:00Z | A          |
| 2018-12-27T10:00:00Z | A          |
| 2018-12-27T11:00:00Z | A          |
| 2018-12-27T12:00:00Z | A          |
| 2018-12-27T13:00:00Z | A          |
| 2018-12-27T09:00:00Z | B          |
| 2018-12-27T10:00:00Z | B          |
| 2018-12-27T11:00:00Z | B          |
| 2018-12-27T12:00:00Z | B          |
| 2018-12-27T13:00:00Z | B          |
| 2018-12-27T14:00:00Z | B          |

Ценю помощь.

Спасибо.

1 Ответ

3 голосов
/ 26 марта 2019

Вот решение, чтобы восполнить недостающие часы.используя windows, lag и udf.С небольшими изменениями он может распространяться и на дни.

from pyspark.sql.window import Window
from pyspark.sql.types import *
from pyspark.sql.functions import *
from dateutil.relativedelta import relativedelta

def missing_hours(t1, t2):
    return [t1 + relativedelta(hours=-x) for x in range(1, t1.hour-t2.hour)]

missing_hours_udf = udf(missing_hours, ArrayType(TimestampType()))

df = spark.read.csv('dates.csv',header=True,inferSchema=True)

window = Window.partitionBy("aggregator").orderBy("timestamp")

df_mising = df.withColumn("prev_timestamp",lag(col("timestamp"),1, None).over(window))\
       .filter(col("prev_timestamp").isNotNull())\
       .withColumn("timestamp", explode(missing_hours_udf(col("timestamp"), col("prev_timestamp"))))\
       .drop("prev_timestamp")

df.union(df_mising).orderBy("aggregator","timestamp").show()

, что приводит к

+-------------------+----------+
|          timestamp|aggregator|
+-------------------+----------+
|2018-12-27 09:00:00|         A|
|2018-12-27 10:00:00|         A|
|2018-12-27 11:00:00|         A|
|2018-12-27 12:00:00|         A|
|2018-12-27 13:00:00|         A|
|2018-12-27 09:00:00|         B|
|2018-12-27 10:00:00|         B|
|2018-12-27 11:00:00|         B|
|2018-12-27 12:00:00|         B|
|2018-12-27 13:00:00|         B|
|2018-12-27 14:00:00|         B|
+-------------------+----------+
...