Можем ли мы динамически получить значение предыдущей строки для обновляемого столбца в фрейме данных pyspark - PullRequest
1 голос
/ 07 февраля 2020

Итак, вот мое точное постановка проблемы. Ниже приводится датафрейм.

+--------+-------+
|  START |  END  |
+--------+-------+
|   1    |   5   |
|   3    |   6   |
|   7    |   10  |
|   13   |   17  |
|   15   |   20  |
+--------+-------+

Представьте, что каждая строка представляет собой строку, начинающуюся с START и заканчивающуюся в END на оси X. И когда мы размещаем их в соответствии с данными, мы не хотим, чтобы линии пересекались. Таким образом, мы складываем их вместо этого.

Таким образом, первая строка остается той же, т. Е. (1, 5)

, поскольку вторая строка не совпадает с первой, нам нужно изменить ее значения START и END. Таким образом, из (3, 6) оно становится (5, 8). (Мы не можем изменить длину строки при укладке)

и третья строка (7, 10) становится (8, 11) (так как она пересекается с предыдущей (5, 8) строкой).

поскольку четвертая строка не пересекается с обновленной третьей строкой, мы не меняем ее значения. Таким образом, остается (13, 17)

, а последний (15, 20) становится (17, 22).

Итак, мой окончательный кадр данных должен быть:

+--------+-------+
|  START |  END  |
+--------+-------+
|   1    |   5   |
|   5    |   8   |
|   8    |   11  |
|   13   |   17  |
|   17   |   22  |
+--------+-------+

Вы можете считать, что исходный фрейм данных отсортирован по столбцу START.

Теперь это простая проблема, когда мы используем циклы, но я хотел сделать это в pyspark без использования циклов. Я новичок в pyspark и не могу найти хорошую функцию, чтобы я мог выполнить ее без циклов.

Теперь перейдем к своему названию, если я смогу получить значение END предыдущей строки динамически (как это изменения) и сравнить с текущим значением START, я могу решить эту проблему. Но я не смог найти ничего, что делает это.

Вот моя попытка решить эту проблему без циклов:

from pyspark import SparkContext 
from pyspark.sql import SQLContext
from pyspark.sql.functions import lag, col
from pyspark.sql.window import Window
from pyspark.sql import functions as F

sc = SparkContext('local', 'Example_2')
sqlcontext = SQLContext(sc)

df = sqlcontext.createDataFrame([(1, 5), (3, 6), (7, 10), (13, 17), (15, 20)], ['START', 'END'])

w = Window().orderBy('START').rangeBetween(Window.unboundedPreceding, -1)

# Updating 'END' column first
df = df.withColumn('END', F.when(
                F.last('END', True).over(w) > col('START'),
                col('END') + (F.last('END').over(w) - col('START'))
            ).otherwise(col('END')))

# Updating 'START' column
df = df.withColumn('START', F.when(
                    F.last('END', True).over(w) > col('START'),
                    F.last('END', True).over(w)
                ).otherwise(col('START')))

Поскольку F.last ('END') не дает обновленного завершения значение, приведенный выше код возвращает следующее, в котором третья строка неверна.

+-----+---+
|START|END|
+-----+---+
|    1|  5|
|    5|  8|
|    8| 10|
|   13| 17|
|   17| 22|
+-----+---+

Ответы [ 2 ]

0 голосов
/ 13 февраля 2020

Я не могу найти функцию, которая может извлечь значение предыдущей строки из столбца обновления. Я полагаю, что вы не можете сделать это в pyspark, и единственное обходное решение - использовать udf и использовать циклы внутри него для решения проблемы.

Но я могу решить проблему, о которой я упоминал в предыдущем вопросе, не используя любой udf и используя некоторые логи c.

Ниже мой код. И моя логика c была разработана более визуально, поэтому я не могу сказать это здесь. Если кому-то любопытно и не удалось понять мою логику c после просмотра кода, прокомментируйте ниже, чтобы я попытался объяснить это.

from pyspark import SparkContext 
from pyspark.sql import SQLContext
from pyspark.sql.functions import lag, col
from pyspark.sql.window import Window
from pyspark.sql import functions as F

sc = SparkContext('local', 'Example_2')
sqlcontext = SQLContext(sc)

df = sqlcontext.createDataFrame([(1, 5), (3, 6), (7, 10), (13, 17), (15, 20)], ['START', 'END'])

w = Window().orderBy('START')

df = df.withColumn('LENGTH', df.END - df.START)

df = df.withColumn('LENGTH_CUMU',
                   F.sum(df.LENGTH).over(w.rowsBetween(Window.unboundedPreceding, -1)))

df = df.withColumn('FIRST_START_DIFF',
                   df.START - F.first('START').over(w))

df = df.withColumn('REQ_SHIFT',
                   F.when(df.FIRST_START_DIFF > df.LENGTH_CUMU,
                          df.FIRST_START_DIFF - df.LENGTH_CUMU) \
                    .otherwise(0))

df = df.withColumn('REQ_SHIFT',
                   F.max('REQ_SHIFT').over(w.rowsBetween(Window.unboundedPreceding, 0)))

df = df.withColumn('START',
                   F.coalesce(df.START - df.FIRST_START_DIFF + df.LENGTH_CUMU + df.REQ_SHIFT, df.START))

df = df.withColumn('END', df.START + df.LENGTH)

df = df.select('START', 'END')

df.show()

Теперь он дает правильный вывод:

+-----+---+
|START|END|
+-----+---+
|    1|  5|
|    5|  8|
|    8| 11|
|   13| 17|
|   17| 22|
+-----+---+
0 голосов
/ 07 февраля 2020

Вам нужно использовать rowsBetween вместо rangeBetween. Так что это на самом деле выберет строки в окне spe c. Смотрите здесь https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=exception#pyspark. sql .Window.rowsBetween

w = Window().orderBy('START').rowsBetween(Window.unboundedPreceding, -1)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...