Как рассчитать дни между последним выполнением условия? - PullRequest
0 голосов
/ 13 января 2020

текущий df:

df = spark.createDataFrame([
    ("2020-01-12","d1",0),
    ("2020-01-12","d2",0),
    ("2020-01-13","d3",0),
    ("2020-01-14","d4",1), 
    ("2020-01-15","d5",0),
    ("2020-01-15","d6",0),
    ("2020-01-16","d7",0),
    ("2020-01-17","d8",0),
    ("2020-01-18","d9",1),
    ("2020-01-19","d10",0),
    ("2020-01-20","d11",0),], 
    ['date', 'device', 'condition'])

df.show()

+----------+------+---------+
|      date|device|condition|
+----------+------+---------+
|2020-01-12|    d1|        0|
|2020-01-12|    d2|        0|
|2020-01-13|    d3|        0|
|2020-01-14|    d4|        1|
|2020-01-15|    d5|        0|
|2020-01-15|    d6|        0|
|2020-01-16|    d7|        0|
|2020-01-17|    d8|        0|
|2020-01-18|    d9|        1|
|2020-01-19|   d10|        0|
|2020-01-20|   d11|        0|
+----------+------+---------+

желаемый вывод df:

want_df = spark.createDataFrame([
    ("2020-01-12","d1",0,0),
    ("2020-01-12","d2",0,0),
    ("2020-01-13","d3",0,1),
    ("2020-01-14","d4",1,2), 
    ("2020-01-15","d5",0,1),
    ("2020-01-15","d6",0,1),
    ("2020-01-16","d7",0,2),
    ("2020-01-17","d8",0,3),
    ("2020-01-18","d9",1,4),
    ("2020-01-19","d10",0,1),
    ("2020-01-20","d11",0,2),], 
    ['date', 'device', 'condition', 'life'])

want_df.show()

+----------+------+---------+----+
|      date|device|condition|life|
+----------+------+---------+----+
|2020-01-12|    d1|        0|   0|
|2020-01-12|    d2|        0|   0|
|2020-01-13|    d3|        0|   1|
|2020-01-14|    d4|        1|   2|
|2020-01-15|    d5|        0|   1|
|2020-01-15|    d6|        0|   1|
|2020-01-16|    d7|        0|   2|
|2020-01-17|    d8|        0|   3|
|2020-01-18|    d9|        1|   4|
|2020-01-19|   d10|        0|   1|
|2020-01-20|   d11|        0|   2|
+----------+------+---------+----+

Цель состоит в том, чтобы вычислить разницу в дате (# дней) до даты, когда condition=1 затем дата разница сбрасывается на количество дней, начиная с момента выполнения последнего условия. life - столбец, который пытается вычислить. Есть идеи, как рассчитать это? Window или lag?

Ответы [ 2 ]

1 голос
/ 14 января 2020

Я пытаюсь предоставить другой способ, более близкий к стандартному диалекту sql, но все же использующий синтаксис pyspark и относящийся к влиянию на производительность.

from pyspark.sql import Window
from pyspark.sql.functions import col, when, lit, lag, min, max, datediff

Выберите диапазон дат, который соответствует условию 1, затем объединение с максимальным значением даты с помощью функции объединения.

w = Window.partitionBy('date')

dateRange = df.select(df.date).where(df.condition == 1)\
    .union(df.select(max(df.date))).distinct()\
    .orderBy('date')\
    .withColumn('lastDate', lag(col('date').over(w))\
    .select(when(col('lastDate').isNull(), lit('1970-01-01')).otherwise(col('lastDate')).alias('lastDate'), col('date').alias('toDate'))

Выберите диапазон дат и первую минимальную дату, соединив df с диапазоном дат, затем выполните группировку и вычислите минимальное значение даты.

dateRange1st = df.join(dateRange, df.date > dateRange.lastDate & df.date <= dateRange.toDate, 'inner').groupBy(dateRange.lastDate, dateRange.toDate).agg(min(df.date).alias('frDate'))

Выберите результат, соединив диапазон дат (1-й) с df, для фильтрации справочной даты и выясните другое.

result = df.join(dateRange1st, df.date.between(dateRange1st.frDate, dateRange1st.toDate), 'inner')\
    .select(df.date, df.device, df.condition)\
    .withColumn('life', datediff(df.date - dataRange1st.frDate))\
    .orderBy(df.date)

result.show()

Надеюсь, это поможет!

1 голос
/ 13 января 2020

Это один из типов вопросов, который можно упростить, добавив несколько временных строк (мы помечаем их, а затем удаляем их позже)

from pyspark.sql import Window
from pyspark.sql.functions import lit, lag, sum as fsum, first, datediff

(1) Сначала создайте новый фрейм данных df1, который реплицирует все Строки с условием == 1, но с установленным условием = 0 и флагом = 1, объединяют результирующий кадр данных с исходным кадром данных (установите флаг = 0):

df1 = df.withColumn('flag', lit(0)).union(
    df.where('condition = 1').withColumn('condition', lit(0)).withColumn('flag', lit(1))
)

(2) Затем установите следующее две спецификации окна, используйте w1, чтобы помочь создать метку подгруппы g, чтобы сгруппировать все последовательные строки до тех пор, пока условие не переключится с 1 на 0. добавьте flag в orderBy (), чтобы вновь добавленные строки располагались прямо позади соответствующая им строка с условием = 1 и сгруппированная в следующую метку группы.

w1 = Window.partitionBy(lit(0)).orderBy('date', 'flag')
w2 = Window.partitionBy(lit(0), 'g').orderBy('date', 'flag')

Примечание: Если у вас огромный массив данных, вы можете изменить lit(0) в некоторые фактические или расчетные столбцы, чтобы избежать Spark, перемещая все строки в один раздел. ОБНОВЛЕНИЕ: На основе комментариев, фрейм данных представляет собой один временной ряд, который может быть загружен в один раздел, поэтому достаточно использовать lit(0).

(3) использовать lag и Функция суммирования по w1 позволяет найти метку подгруппы «g», а затем вычислить first_date в той же группе, используя WindowSpe c w2. эта дата используется для вычисления столбца 'life':

df2 = df1.withColumn('g', fsum((lag('condition').over(w1) == 1).astype('int')).over(w1)) \
    .withColumn('first_date', first('date').over(w2)) \
    .withColumn('life', datediff('date','first_date'))
df2.show()
+----------+------+---------+----+---+----------+----+
|      date|device|condition|flag|  g|first_date|life|
+----------+------+---------+----+---+----------+----+
|2020-01-12|    d1|        0|   0|  0|2020-01-12|   0|
|2020-01-12|    d2|        0|   0|  0|2020-01-12|   0|
|2020-01-13|    d3|        0|   0|  0|2020-01-12|   1|
|2020-01-14|    d4|        1|   0|  0|2020-01-12|   2|
|2020-01-14|    d4|        0|   1|  1|2020-01-14|   0|
|2020-01-15|    d5|        0|   0|  1|2020-01-14|   1|
|2020-01-15|    d6|        0|   0|  1|2020-01-14|   1|
|2020-01-16|    d7|        0|   0|  1|2020-01-14|   2|
|2020-01-17|    d8|        0|   0|  1|2020-01-14|   3|
|2020-01-18|    d9|        1|   0|  1|2020-01-14|   4|
|2020-01-18|    d9|        0|   1|  2|2020-01-18|   0|
|2020-01-19|   d10|        0|   0|  2|2020-01-18|   1|
|2020-01-20|   d11|        0|   0|  2|2020-01-18|   2|
+----------+------+---------+----+---+----------+----+

(4) удалить временные строки и столбцы для получения окончательного кадра данных:

df_new = df2.filter('flag = 0').drop('first_date', 'g', 'flag')
df_new.show()
+----------+------+---------+----+
|      date|device|condition|life|
+----------+------+---------+----+
|2020-01-12|    d1|        0|   0|
|2020-01-12|    d2|        0|   0|
|2020-01-13|    d3|        0|   1|
|2020-01-14|    d4|        1|   2|
|2020-01-15|    d5|        0|   1|
|2020-01-15|    d6|        0|   1|
|2020-01-16|    d7|        0|   2|
|2020-01-17|    d8|        0|   3|
|2020-01-18|    d9|        1|   4|
|2020-01-19|   d10|        0|   1|
|2020-01-20|   d11|        0|   2|
+----------+------+---------+----+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...