Найти разницу между двумя значениями столбца с помощью поиска и условия - PullRequest
3 голосов
/ 22 мая 2019

В pyspark у меня есть такой фрейм данных, как показано ниже, в котором строки сортируются по идентификатору и значению k1. Кроме того, каждому ряду присваивается уникальный восходящий номер (rowid).

-----------------------
rowid | id | k1  | k2 |
-----------------------
1     | 1  | v1 | l1  |
2     | 1  | v1 | v1  |
3     | 1  | v1 | l2  |
4     | 2  | v2 | v2  |
5     | 2  | v2 | l3  |
6     | 3  | v3 | l3  |
----------------------

Для каждого уникального значения id я хочу вычислить разницу между rowid первой строки, в которой k1 == k2, и rowid, соответствующим первой строке, в которой наблюдается запись с id + 1, и сохранить результаты в новом столбце (т.е. ранг). Вывод должен выглядеть следующим образом.

----------------
 id | k1  |rank |
-----------------
 1  | v1  | 2   |
 2  | v2  | 1   |
 3  | v3  | 0   | 
-----------------

например, для id = 1 значение k1 == k2, когда rowid = 2. Первый раз, когда id = 1, наблюдался, когда rowid = 1. Поместите 2-1 + 1 = 2 в ранг столбца. Для id = 3 у нас нет записи, в которой совпадают значения столбцов k1 и k2. Поэтому заполните столбец рейтинга 0 (или ноль).

Я предполагаю, что это включает groupBy на основе идентификатора, но я не уверен, как получить индекс, соответствующий строке, в которой совпадают столбцы k1 и k2, и первый идентификатор строки, соответствующий каждому уникальному идентификатору.

Ответы [ 2 ]

1 голос
/ 22 мая 2019

Вы можете сделать это, используя функции API с groupBy на id и k1, что должно быть на быстрее, чем с использованием udf:

import pyspark.sql.functions as f

df.groupBy("id", "k1")\
    .agg(
        f.min(f.when(f.col("k1")==f.col("k2"), f.col("rowid"))).alias("first_equal"),
        f.min("rowid").alias("first_row")
    )\
    .select("id", "k1", (f.col("first_equal")-f.col("first_row")+1).alias("rank"))\
    .fillna(0)\
    .show()
#+---+---+----+
#| id| k1|rank|
#+---+---+----+
#|  1| v1|   2|
#|  2| v2|   1|
#|  3| v3|   0|
#+---+---+----+

Вычисление rank можно разбить на два этапа агрегирования:

  • Первая агрегация принимает мин rowid, для которого k1==k2 для каждой пары id, k1.
  • Вторая агрегация занимает минимум rowid по каждой паре id, k1.

Вы берете разницу (+1 согласно вашим требованиям) и, наконец, заполняете любые значения null 0.


Обновление : альтернативный способ использования row_number:

from pyspark.sql import Window

# you can define your own order by column
w = Window.partitionBy("id", "k1").orderBy("rowid")

df.withColumn("rank", f.when(f.expr("k1 = k2"), f.row_number().over(w)))\
    .groupBy("id", "k1")\
    .agg(f.min("rank"))\
    .fillna(0)\
    .show()
# Same as above
1 голос
/ 22 мая 2019

Первое создание примера кадра данных,

import pyspark.sql.functions as F
from pyspark.sql.types import *

df = sql.createDataFrame([
            (1, 1, 'v1' , 'l1'),
            (2, 1, 'v1' , 'v1'),
            (3, 1, 'v1' , 'l2'),
            (4, 2, 'v2' , 'v2'),
            (5, 2, 'v2' , 'l3'),
            (6, 3, 'v3' , 'l3'),
            ],[
            'rowid', 'id', 'k1', 'k2'])

Затем создайте udf и примените его к столбцам,

def get_rank_udf(rows):
    rows = sorted(rows, key=lambda x: x['rowid'])
    first_row_id = rows[0]['rowid']
    for _r in rows:
        if _r['k1'] == _r['k2']:
            equal_row_id = _r['rowid']
            break
        else:
            equal_row_id = None

    if equal_row_id is None:
        return 0
    return equal_row_id - first_row_id + 1

get_rank = F.udf(lambda x: get_rank_udf(x), IntegerType())

df = df.groupby('id', 'k1').agg(F.collect_list(F.struct('rowid', 'k1', 'k2')).alias('elements'))\
       .withColumn('rank', get_rank(F.col('elements')))\
       .select('id', 'k1', 'rank')

Это дает вывод,

+---+---+----+                                                                  
| id| k1|rank|
+---+---+----+
|  1| v1|   2|
|  2| v2|   1|
|  3| v3|   0|
+---+---+----+
...