Различие элементов в списке PySpark с одним или двумя элементами - PullRequest
1 голос
/ 24 апреля 2020

У меня есть фрейм данных PySpark, в котором есть список с одним или двумя элементами. Когда в списке есть два элемента, они не упорядочены по возрастанию или убыванию.

+--------+----------+-------+
| version| timestamp| list  |
+--------+-----+----|-------+
| v1     |2012-01-10| [5,2] |
| v1     |2012-01-11| [2,5] |
| v1     |2012-01-12| [3,2] |
| v2     |2012-01-12| [2]   |
| v2     |2012-01-11| [1,2] |
| v2     |2012-01-13| [1]   |
+--------+----------+-------+

Я хочу взять разницу между первым и вторым элементами списка (при наличии двух элементов) и использовать ее в качестве другого столбца (diff). Когда в списке есть только один элемент, я хочу поставить ноль на выходе. Вот пример вывода, который я хочу.

+--------+----------+-------+-------+
| version| timestamp| list  |  diff | 
+--------+-----+----|-------+-------+
| v1     |2012-01-10| [5,2] |   3   |
| v1     |2012-01-11| [2,5] |  -3   |
| v1     |2012-01-12| [3,2] |   1   |
| v2     |2012-01-12| [2]   |   0   |
| v2     |2012-01-11| [1,2] |  -1   |
| v2     |2012-01-13| [1]   |   0   |
+--------+----------+-------+-------+

Мой вопрос похож на этот вопрос , который я задавал ранее, но не совсем совпадает.

Как я могу сделать это, используя PySpark?

Я также открыт для использования пользовательских функций, чтобы получить желаемый результат в случае необходимости.

Приветствуются подходы без UDF и те, которые основаны на UDF. Благодаря.

Ответы [ 2 ]

1 голос
/ 24 апреля 2020

Добавление к @Shu's ответа от предыдущего, просто добавьте к нему предложение when/otherwise, отметив size массива.

df.withColumn("diff", F.when(F.size('list')==2, F.expr("""transform(array(list),x-> x[0]-x[1])""")[0])\
                       .otherwise(F.lit(0))).show()

#+------+----+
#|  list|diff|
#+------+----+
#|[5, 2]|   3|
#|[2, 5]|  -3|
#|   [2]|   0|
#+------+----+
0 голосов
/ 24 апреля 2020

Вы также можете определить udf, например,

Пример данных

data = [
    ('v1', [5, 2],),
    ('v1', [2, 5],),
    ('v1', [3, 2],),
    ('v2', [2],),
    ('v2', [1, 2],),
    ('v2', [1],),
]
df = spark.createDataFrame(data, ['version', 'list'])

Решение

from functools import reduce
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

# UDF definition
find_diff = udf(lambda a: reduce(lambda x, y: x - y, a), IntegerType()) 

(
    df.
        withColumn(
            'diff',
            find_diff('list')
        ).
        show(truncate=False)
)

+-------+------+----+                                                           
|version|list  |diff|
+-------+------+----+
|v1     |[5, 2]|3   |
|v1     |[2, 5]|-3  |
|v1     |[3, 2]|1   |
|v2     |[2]   |2   |
|v2     |[1, 2]|-1  |
|v2     |[1]   |1   |
+-------+------+----+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...