Итак, вот мое точное постановка проблемы. Ниже приводится датафрейм.
+--------+-------+
| START | END |
+--------+-------+
| 1 | 5 |
| 3 | 6 |
| 7 | 10 |
| 13 | 17 |
| 15 | 20 |
+--------+-------+
Представьте, что каждая строка представляет собой строку, начинающуюся с START
и заканчивающуюся в END
на оси X. И когда мы размещаем их в соответствии с данными, мы не хотим, чтобы линии пересекались. Таким образом, мы складываем их вместо этого.
Таким образом, первая строка остается той же, т. Е. (1, 5)
, поскольку вторая строка не совпадает с первой, нам нужно изменить ее значения START
и END
. Таким образом, из (3, 6) оно становится (5, 8). (Мы не можем изменить длину строки при укладке)
и третья строка (7, 10) становится (8, 11) (так как она пересекается с предыдущей (5, 8) строкой).
поскольку четвертая строка не пересекается с обновленной третьей строкой, мы не меняем ее значения. Таким образом, остается (13, 17)
, а последний (15, 20) становится (17, 22).
Итак, мой окончательный кадр данных должен быть:
+--------+-------+
| START | END |
+--------+-------+
| 1 | 5 |
| 5 | 8 |
| 8 | 11 |
| 13 | 17 |
| 17 | 22 |
+--------+-------+
Вы можете считать, что исходный фрейм данных отсортирован по столбцу START
.
Теперь это простая проблема, когда мы используем циклы, но я хотел сделать это в pyspark без использования циклов. Я новичок в pyspark и не могу найти хорошую функцию, чтобы я мог выполнить ее без циклов.
Теперь перейдем к своему названию, если я смогу получить значение END
предыдущей строки динамически (как это изменения) и сравнить с текущим значением START
, я могу решить эту проблему. Но я не смог найти ничего, что делает это.
Вот моя попытка решить эту проблему без циклов:
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.functions import lag, col
from pyspark.sql.window import Window
from pyspark.sql import functions as F
sc = SparkContext('local', 'Example_2')
sqlcontext = SQLContext(sc)
df = sqlcontext.createDataFrame([(1, 5), (3, 6), (7, 10), (13, 17), (15, 20)], ['START', 'END'])
w = Window().orderBy('START').rangeBetween(Window.unboundedPreceding, -1)
# Updating 'END' column first
df = df.withColumn('END', F.when(
F.last('END', True).over(w) > col('START'),
col('END') + (F.last('END').over(w) - col('START'))
).otherwise(col('END')))
# Updating 'START' column
df = df.withColumn('START', F.when(
F.last('END', True).over(w) > col('START'),
F.last('END', True).over(w)
).otherwise(col('START')))
Поскольку F.last ('END') не дает обновленного завершения значение, приведенный выше код возвращает следующее, в котором третья строка неверна.
+-----+---+
|START|END|
+-----+---+
| 1| 5|
| 5| 8|
| 8| 10|
| 13| 17|
| 17| 22|
+-----+---+