Проверяйте данные из одного столбца в разных строках с помощью pyspark - PullRequest
0 голосов
/ 03 июля 2019

Как я могу изменить значение столбца в зависимости от проверки между ячейками? Мне нужно сравнить значения в километрах для каждой записи (id) клиента, чтобы сравнить, является ли запись, которая следует за километром, выше.

fecha      id   estado  id_cliente  error_code  kilometraje error_km
1/1/2019    1     A         1                       10  
2/1/2019    2     A                    ERROR        20  
3/1/2019    1     D         1          ERROR        30
4/1/2019    2     O                                          ERROR

Ошибка в столбце error_km заключается в том, что для клиента (id) 2 значение в километрах меньше, чем в той же записи клиента для 1.02.2009 (если время проходит, автомобиль используется, поэтому километраж увеличивается, так что без ошибок пробег должен быть выше или одинаковым)

Я знаю, что с помощью Column я могу перезаписать или создать столбец, который не существует и который используется, когда я могу задавать условия. Например: это будет код, который я использую для проверки столбцов estado и id_cliente, а ERROR перезаписывает столбец error_code, где это применимо, но я не понимаю, как проверять разные строки для одного и того же клиента.

from pyspark.sql.functions import lit
from pyspark.sql import functions as F
from pyspark.sql.functions import col

file_path = 'archive.txt'

error = 'ERROR'

df = spark.read.parquet(file_path)
df = df.persist(StorageLevel.MEMORY_AND_DISK)
df = df.select('estado', 'id_cliente')
df = df.withColumn("error_code", lit(''))

df = df.withColumn('error_code',
                            F.when((F.col('status') == 'O') &
                                    (F.col('client_id') != '') |
                                    (F.col('status') == 'D') &
                                    (F.col('client_id') != '') |
                                    (F.col('status') == 'A') &
                                    (F.col('client_id') == ''),
                                     F.concat(F.col("error_code"), F.lit(":[{}]".format(error)))
                                   )
                             .otherwise(F.col('error_code')))

Ответы [ 2 ]

2 голосов
/ 04 июля 2019

Это достигается с помощью оконной функции lag .Функция отставания возвращает вам строку перед текущей строкой.При этом вы можете легко сравнить значения в километрах.Посмотрите на код ниже:

import pyspark.sql.functions as F
from pyspark.sql import Window

l = [('1/1/2019' , 1      , 10),
('2/1/2019', 2     , 20  ),
('3/1/2019', 1      , 30  ),
('4/1/2019', 1      , 10  ),
('5/1/2019', 1      , 30  ),
('7/1/2019', 3      , 30  ),
('4/1/2019', 2      , 5)]

columns = ['fecha', 'id', 'kilometraje']

df=spark.createDataFrame(l, columns)
df = df.withColumn('fecha',F.to_date(df.fecha,  'dd/MM/yyyy'))

w = Window.partitionBy('id').orderBy('fecha')

df = df.withColumn('error_km', F.when(F.lag('kilometraje').over(w) > df.kilometraje, F.lit('ERROR') ).otherwise(F.lit('')))

df.show()

Вывод:

+----------+---+-----------+--------+ 
|     fecha| id|kilometraje|error_km| 
+----------+---+-----------+--------+ 
|2019-01-01|  1|         10|        | 
|2019-01-03|  1|         30|        | 
|2019-01-04|  1|         10|   ERROR| 
|2019-01-05|  1|         30|        | 
|2019-01-07|  3|         30|        | 
|2019-01-02|  2|         20|        | 
|2019-01-04|  2|          5|   ERROR| 
+----------+---+-----------+--------+

Четвертая строка не помечена как «ОШИБКА», так как предыдущее значение имело меньшее значение в километрах (10 <30).Если вы хотите пометить все идентификаторы как «ОШИБКА», которые содержат хотя бы одну поврежденную строку, выполните левое соединение. </p>

df.drop('error_km').join(df.filter(df.error_km == 'ERROR').groupby('id').agg(F.first(df.error_km).alias('error_km')), 'id', 'left').show()
0 голосов
/ 04 июля 2019

Я использую .rangeBetween (Window.unboundedPreceding, 0).

Эта функция ищет в текущем значении добавленное значение для обратного

import pyspark
from pyspark.sql.functions import lit
from pyspark.sql import functions as F
from pyspark.sql.functions import col
from pyspark.sql import Window
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .getOrCreate()

error = 'This is error'

l = [('1/1/2019' , 1      , 10),
('2/1/2019', 2     , 20  ),
('3/1/2019', 1      , 30  ),
('4/1/2019', 1      , 10  ),
('5/1/2019', 1      , 22  ),
('7/1/2019', 1      , 23  ),
('22/1/2019', 2      , 5),
('11/1/2019', 2      , 24),
('13/2/2019', 1      , 16),
('14/2/2019', 2      , 18),
('5/2/2019', 1      , 19),
('6/2/2019', 2      , 23),
('7/2/2019', 1      , 14),
('8/3/2019', 1      , 50),
('8/3/2019', 2      , 50)]

columns = ['date', 'vin', 'mileage']

df=spark.createDataFrame(l, columns)
df = df.withColumn('date',F.to_date(df.date,  'dd/MM/yyyy'))
df = df.withColumn("max", lit(0))
df = df.withColumn("error_code", lit(''))

w = Window.partitionBy('vin').orderBy('date').rangeBetween(Window.unboundedPreceding,0)

df = df.withColumn('max',F.max('mileage').over(w))
df = df.withColumn('error_code', F.when(F.col('mileage') < F.col('max'), F.lit('ERROR')).otherwise(F.lit('')))

df.show()

enter image description here

Наконец, осталось только удалить столбец с максимальным значением

df = df.drop('max')
df.show()

enter image description here

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...