Ресэмплирование данных PySpark с месяцев до недель - PullRequest
1 голос
/ 29 апреля 2019

Фрейм данных входного pyspark имеет одну строку на key_id и date_month. Для одного случайного key_id это выглядит так

+--------+-------------+---------+---------+
| key_id | date_month  | value_1 | value_2 |
+--------+-------------+---------+---------+
|      1 | 2019-02-01  |   1.135 | 'a'     |
|      1 | 2019-03-01  |   0.165 | 'b'     |
|      1 | 2019-04-01  |     0.0 | null    |
+--------+-------------+---------+---------+

Это должно быть пересчитано до еженедельной детализации, чтобы выглядеть следующим образом

+--------+-------------+---------+---------+
| key_id |  date_week  | value_1 | value_2 |
+--------+-------------+---------+---------+
|      1 | 2019-02-04  |   1.135 | 'a'     |
|      1 | 2019-02-11  |   1.135 | 'a'     |
|      1 | 2019-02-18  |   1.135 | 'a'     |
|      1 | 2019-02-25  |   1.135 | 'a'     |
|      1 | 2019-03-04  |   0.165 | 'b'     |
|      1 | 2019-03-11  |   0.165 | 'b'     |
|      1 | 2019-03-18  |   0.165 | 'b'     |
|      1 | 2019-03-25  |   0.165 | 'b'     |
|      1 | 2019-04-01  |     0.0 | null    |
|      1 | 2019-04-08  |     0.0 | null    |
|      1 | 2019-04-15  |     0.0 | null    |
|      1 | 2019-04-22  |     0.0 | null    |
|      1 | 2019-04-29  |     0.0 | null    |
+--------+-------------+---------+---------+

В настоящее время это ~ 30 строк кода переключения между фреймами данных PySpark и Pandas: смена диапазона дат, объединений и т. Д.

Есть ли способ сделать это в PySpark простым способом?

Я пытался Панды пересчитать с месяцев до недель , но я не могу понять, как заставить его работать, когда мой "первичный ключ" представляет собой комбинацию date_month и key_id.

В настоящее время число строк в исходном фрейме данных невелико ~ 250 КБ, и, я думаю, преобразование фрейма данных PySpark toPandas() и последующее преобразование в Pandas является приемлемым вариантом.

1 Ответ

0 голосов
/ 01 мая 2019

Решение, приведенное ниже, включает в себя составление карт от месяцев до недель (где недели - понедельники месяца) и присоединение их к вашим исходным данным.

Скучный раздел для имитации ваших данных:

## Replicate data with join trick to get out nulls
## Convert string to date format

import pyspark.sql.functions as F

c = ['key_id','date_month','value_1']
d = [(1,'2019-02-01',1.135),
        (1,'2019-03-01',0.165),
        (1,'2019-04-01',0.0)]

c2 = ['date_month','value_2']
d2 = [('2019-02-01','a'),
      ('2019-03-01','b')]

df = spark.createDataFrame(d,c)
df2 = spark.createDataFrame(d2,c2)

test_df = df.join(df2, how = 'left', on = 'date_month')

test_df_date = test_df.withColumn('date_month', F.to_date(test_df['date_month']))

test_df_date.orderBy('date_month').show() 

Ваши данные:

+----------+------+-------+-------+
|date_month|key_id|value_1|value_2|
+----------+------+-------+-------+
|2019-02-01|     1|  1.135|      a|
|2019-03-01|     1|  0.165|      b|
|2019-04-01|     1|    0.0|   null|
+----------+------+-------+-------+

Постройте картограф, используя хитрый трюк из: получите вседаты между двумя датами в Spark DataFrame

Заканчивается с отображением месяца, с начала недели в месяце (Вы можете сделать это прямо к исходным данным вместо создания сопоставителя.)

## Build month to week mapper

## Get first and last of each month, and number of days between
months = test_df_date.select('date_month').distinct()
months = months.withColumn('date_month_end', F.last_day(F.col('date_month')))
months = months.withColumn('days', F.datediff(F.col('date_month_end'), 
                                              F.col('date_month')))

## Use trick from https://stackoverflow.com/questions/51745007/get-all-the-dates-between-two-dates-in-spark-dataframe 
## Adds a column 'day_in_month' with all days in the month from first to last. 
## 
months = months.withColumn("repeat", F.expr("split(repeat(',', days), ',')"))\
    .select("*", F.posexplode("repeat").alias("day_in_month", "val"))\
    .drop("repeat", "val", "days")\
    .withColumn("day_in_month", F.expr("date_add(date_month, day_in_month)"))\

## Add integer day of week value - Sunday == 1, Monday == 2,
## Filter by mondays,
## Rename and drop columns 
months = months.withColumn('day', F.dayofweek(F.col('day_in_month')))
months = months.filter(F.col('day') == 2)
month_week_mapper = months.withColumnRenamed('day_in_month', 'date_week')\
    .drop('day', 'date_month_end')

month_week_mapper.orderBy('date_week').show()

Mapper выглядит следующим образом:

+----------+----------+
|date_month| date_week|
+----------+----------+
|2019-02-01|2019-02-04|
|2019-02-01|2019-02-11|
|2019-02-01|2019-02-18|
|2019-02-01|2019-02-25|
|2019-03-01|2019-03-04|
|2019-03-01|2019-03-11|
|2019-03-01|2019-03-18|
|2019-03-01|2019-03-25|
|2019-04-01|2019-04-01|
|2019-04-01|2019-04-08|
|2019-04-01|2019-04-15|
|2019-04-01|2019-04-22|
|2019-04-01|2019-04-29|
+----------+----------+

Затем мы выполняем левое соединение с исходными данными, каждый месяц присоединяется к каждой из соответствующих недель.Последняя строка просто отбрасывает лишние столбцы и переупорядочивает строки / столбцы в соответствии с желаемым результатом.

## Perform the join, and do some cleanup to get results into order/format specified above. 
out_df = test_df_date.join(month_week_mapper, on = 'date_month', how = 'left')

out_df.drop('date_month')\
    .select('key_id','date_week','value_1','value_2')\
    .orderBy('date_week')\
    .show()
## Gives me an output of:
+------+----------+-------+-------+
|key_id| date_week|value_1|value_2|
+------+----------+-------+-------+
|     1|2019-02-04|  1.135|      a|
|     1|2019-02-11|  1.135|      a|
|     1|2019-02-18|  1.135|      a|
|     1|2019-02-25|  1.135|      a|
|     1|2019-03-04|  0.165|      b|
|     1|2019-03-11|  0.165|      b|
|     1|2019-03-18|  0.165|      b|
|     1|2019-03-25|  0.165|      b|
|     1|2019-04-01|    0.0|   null|
|     1|2019-04-08|    0.0|   null|
|     1|2019-04-15|    0.0|   null|
|     1|2019-04-22|    0.0|   null|
|     1|2019-04-29|    0.0|   null|
+------+----------+-------+-------+

Это должно работать с вашим столбцом KeyID, хотя вам нужно будет проверить его с некоторыми немного более разнообразными данными, чтобы быть уверенным.

Я бы определенно выступил за то, чтобы сделать что-то похожее на вышесказанное, вместо того, чтобы переходить в Панд и обратно.df.toPandas довольно медленный, и если размер ваших данных со временем увеличивается, метод Pandas в какой-то момент потерпит неудачу, и вы (или кто-либо когда-либо поддерживает код) столкнетесь с этой проблемой, в любом случае.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...