Pyspark: использование collect_list над window () с условием - PullRequest
1 голос
/ 28 апреля 2020

У меня есть следующие тестовые данные:

import pandas as pd
import datetime

data = {'date': ['2014-01-01', '2014-01-02', '2014-01-03', '2014-01-04', '2014-01-05', '2014-01-06'],
     'customerid': [2, 2, 2, 3, 4, 3], 'names': ['Andrew', 'Pete', 'Sean', 'Steve', 'Ray', 'Stef'], 'PaymentType': ['OI', 'CC', 'CC', 'OI', 'OI', 'OI']}
data = pd.DataFrame(data)
data['date'] = pd.to_datetime(data['date'])

Следующий код дает мне список имен с совпадающими значениями customerid в течение двухдневного периода:

import pandas as pd
import datetime

from pyspark.sql.window import Window
from pyspark.sql import functions as F
from pyspark.sql import SparkSession


spark = SparkSession.builder \
    .master('local[*]') \
    .config("spark.driver.memory", "500g") \
    .appName('my-pandasToSparkDF-app') \
    .config("spark.ui.showConsoleProgress", "false")\
    .getOrCreate()
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
spark.conf.set('spark.sql.execution.arrow.maxRecordsPerBatch', 50000)
spark.sparkContext.setLogLevel("OFF")


data = {'date': ['2014-01-01', '2014-01-02', '2014-01-03', '2014-01-04', '2014-01-05', '2014-01-06'],
     'customerid': [2, 2, 2, 3, 4, 3], 'names': ['Andrew', 'Pete', 'Sean', 'Steve', 'Ray', 'Stef'], 'PaymentType': ['OI', 'CC', 'CC', 'OI', 'OI', 'OI']}
data = pd.DataFrame(data)
data['date'] = pd.to_datetime(data['date'])
spark_data= spark.createDataFrame(data)

win = Window().partitionBy('customerid').orderBy((F.col('date')).cast("long")).rangeBetween(
        -(2*86400), Window.currentRow)

result_frame = spark_data.withColumn("names_array", F.collect_list('names').over(win)).sort(F.col("date").asc())

pd_result_frame = result_frame.toPandas()

Данные:

<code><pre>
date      |customerid|PaymentType|names 
2014-01-01|2         |OI         |Andrew
2014-01-02|2         |CC         |Pete 
2014-01-03|2         |CC         |Sean
2014-01-04|3         |OI         |Steve
2014-01-05|4         |OI         |Ray
2014-01-06|3         |OI         |Stef

Результирующая таблица:

<code><pre>
date      |customerid|PaymentType|names_array|
2014-01-01|2         |OI         |['Andrew']
2014-01-02|2         |CC         |['Andrew', 'Pete'] 
2014-01-03|2         |CC         |['Andrew', 'Pete', 'Sean']
2014-01-04|3         |OI         |['Steve']
2014-01-05|4         |OI         |['Ray']
2014-01-06|3         |OI         |['Steve', 'Stef']

Теперь я хотел бы ввести условие для F.collect_list. В списки должны быть включены только те имена, для которых PaymentType == 'OI'.

В конце таблица должна выглядеть следующим образом:

<code><pre>
date      |customerid|PaymentType|names_array|
2014-01-01|2         |OI         |['Andrew']
2014-01-02|2         |CC         |['Andrew'] 
2014-01-03|2         |CC         |['Andrew']
2014-01-04|3         |OI         |['Steve']
2014-01-05|4         |OI         |['Ray']
2014-01-06|3         |OI         |['Steve', 'Stef']

Спасибо!

1 Ответ

0 голосов
/ 28 апреля 2020

Вы можете поместить предложение when/otherwise в свой collect_list для сбора только тогда, когда PaymentType is 'OI', otherwise collect None.

spark_data.withColumn("names_array",\
                      F.collect_list(F.when(F.col("PaymentType")=='OI',F.col("names"))\
                      .otherwise(F.lit(None))).over(win)).sort(F.col("date").asc()).show()

#+-------------------+----------+------+-----------+-------------+
#|               date|customerid| names|PaymentType|  names_array|
#+-------------------+----------+------+-----------+-------------+
#|2014-01-01 00:00:00|         2|Andrew|         OI|     [Andrew]|
#|2014-01-02 00:00:00|         2|  Pete|         CC|     [Andrew]|
#|2014-01-03 00:00:00|         2|  Sean|         CC|     [Andrew]|
#|2014-01-04 00:00:00|         3| Steve|         OI|      [Steve]|
#|2014-01-05 00:00:00|         4|   Ray|         OI|        [Ray]|
#|2014-01-06 00:00:00|         3|  Stef|         OI|[Steve, Stef]|
#+-------------------+----------+------+-----------+-------------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...