Pyspark: динамически генерировать условие для предложения when () во время выполнения - PullRequest
0 голосов
/ 08 ноября 2019

Я прочитал CSV-файл в pyspark dataframe. Теперь, если я применяю условия в предложении when(), он прекрасно работает, если условия заданы до runtime.

import pandas as pd
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import functions
from pyspark.sql.functions import col

sc = SparkContext('local', 'example')
sql_sc = SQLContext(sc)

pandas_df = pd.read_csv('file.csv')  # assuming the file contains a header
# Sample content of csv file
# col1,value
# 1,aa
# 2,bbb

s_df = sql_sc.createDataFrame(pandas_df)

new_df = s_df.withColumn('value', functions.when((col("col1") == 2) | (col("value") == "aa"), s_df.value).otherwise(
    2))

new_df.show(truncate=False)

Но мне нужно динамически формировать условия внутри предложения when из списка.

[{'column': 'col1', 'operator': '==', 'value': 2}, {'column': 'value', 'operator': '==', 'value': "aa"}]

Есть ли способ добиться этого?

Заранее спасибо.

1 Ответ

1 голос
/ 08 ноября 2019

Вы можете:

  1. динамически генерировать строку SQL, Python 3.6+ 'f-строки действительно удобны для этого.
  2. передать ее вpyspark.sql.functions.expr, чтобы сделать из него pyspark.sql.column.Column.

Для вашего примера что-то вроде этого должно работать:

С учетом схемы s_df:

root
 |-- col1: long (nullable = false)
 |-- value: string (nullable = false)

Импорт функций и создание экземпляра вашей коллекции условий:

[...]
from pyspark.sql.functions import col, expr, when
conditions = [
    {'column': 'col1', 'operator': '==', 'value':  3}, 
    {'column': 'value', 'operator': '==', 'value': "'aa'"}
]
  • С генерацией всего оператора if:
new_df = s_df.withColumn('value', expr(
    f"if({conditions[0]['column']}{conditions[0]['operator']}{conditions[0]['value']}"
    f" OR {conditions[1]['column']}{conditions[1]['operator']}{conditions[1]['value']},"
    "value, 2)")).show()
  • Или только с генерацией условия, переданного функции when.
new_df = s_df.withColumn('value',when(
    expr(
        f"{conditions[0]['column']}{conditions[0]['operator']}{conditions[0]['value']}"
        f" OR {conditions[1]['column']}{conditions[1]['operator']}{conditions[1]['value']}"
    ),
    col("value")).otherwise(2)).show()
...