pyspark вперёд заполнить столбцы Time Stamp определенным значением (1 секунда) - PullRequest
0 голосов
/ 09 июля 2019

в прошлом я задавал этот вопрос относительно библиотеки панд Python: pandas forward fill столбцы Time Stamp с определенным значением (1 секунда)

Но теперь я буду выполнять огромную обработку данных в pyspark, поэтому попросил бы другое решение в pyspark:

У меня есть искра DataFrame:

df = spark.createDataFrame([Row(a=1, b='2018-09-26 04:38:32.544', c='11', d='foo'),
                            Row(a=2, b='', c='22', d='bar'),
                            Row(a=3, b='', c='33', d='foo'),
                            Row(a=4, b='', c='44', d='bar'),
                            Row(a=5, b='2018-09-26 04:58:32.544', c='55', d='foo'),
                            Row(a=6, b='', c='66', d='bar')])
df.show(truncate=False)

|a  |b                      |c  |d  |
+---+-----------------------+---+---+
|1  |2018-09-26 04:38:32.544|11 |foo|
|2  |                       |22 |bar|
|3  |                       |33 |foo|
|4  |                       |44 |bar|
|5  |2018-09-26 04:58:32.544|55 |foo|
|6  |                       |66 |bar|
+---+-----------------------+---+---+

И я хотел бы последовательно добавить 1 секунду к каждому NaT из предыдущих доступных:

|a  |b                      |c  |d  |
+---+-----------------------+---+---+
|1  |2018-09-26 04:38:32.544|11 |foo|
|2  |2018-09-26 04:39:32.544|22 |bar|
|3  |2018-09-26 04:40:32.544|33 |foo|
|4  |2018-09-26 04:41:32.544|44 |bar|
|5  |2018-09-26 04:58:32.544|55 |foo|
|6  |2018-09-26 04:59:32.544|66 |bar|
+---+-----------------------+---+---+

Я читал, что udf следует избегать, так как они замедляют обработку миллионов строк. Спасибо за помощь!

1 Ответ

1 голос
/ 10 июля 2019

Возможно, это не самое эффективное решение, так как мы не можем разбить фрейм данных в соответствии с вашими требованиями.Это означает, что все данные загружаются в один раздел и упорядочиваются там.Возможно, кто-то может предложить лучшее решение.

В приведенном ниже коде используется оконная функция lag , которая возвращает значение предыдущей строки.Мы применяем это только тогда, когда текущее значение для b равно нулю, в противном случае мы сохраняем текущее значение.Когда текущее значение равно нулю, мы добавляем одну секунду к значению предыдущей строки.Мы должны сделать это несколько раз как строку, которая содержит ноль в столбце b, а предыдущая строка, которая также содержит ноль в столбце 'b', получит ноль, возвращаемый из запаздывания (т. Е. Запаздывание не применяется последовательно, и поэтому мы имеемсделать это самостоятельно).

import pyspark.sql.functions as F
from pyspark.sql import Row
from pyspark.sql import Window

df = spark.createDataFrame([Row(a=1, b='2018-09-26 04:38:32.544', c='11', d='foo'),
                            Row(a=2, b='', c='22', d='bar'),
                            Row(a=3, b='', c='33', d='foo'),
                            Row(a=4, b='', c='44', d='bar'),
                            Row(a=5, b='2018-09-26 04:58:32.544', c='55', d='foo'),
                            Row(a=6, b='', c='66', d='bar')])

df = df.withColumn('a',  df.a.cast("int"))
df = df.withColumn('b',  df.b.cast("timestamp"))

w = Window.orderBy('a')

while df.filter(df.b.isNull()).count() != 0:
    df = df.withColumn('b', F.when(df.b.isNotNull(), df.b).otherwise(F.lag('b').over(w)  + F.expr('INTERVAL 1 SECONDS')))

df.show(truncate=False)

Вывод:

+---+-----------------------+---+---+ 
| a |                     b | c | d | 
+---+-----------------------+---+---+ 
| 1 |2018-09-26 04:38:32.544|11 |foo| 
| 2 |2018-09-26 04:38:33.544|22 |bar| 
| 3 |2018-09-26 04:38:34.544|33 |foo| 
| 4 |2018-09-26 04:38:35.544|44 |bar| 
| 5 |2018-09-26 04:58:32.544|55 |foo| 
| 6 |2018-09-26 04:58:33.544|66 |bar| 
+---+-----------------------+---+---+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...