Вы можете:
- динамически генерировать строку SQL, Python 3.6+ 'f-строки действительно удобны для этого.
- передать ее в
pyspark.sql.functions.expr
, чтобы сделать из него pyspark.sql.column.Column
.
Для вашего примера что-то вроде этого должно работать:
С учетом схемы s_df
:
root
|-- col1: long (nullable = false)
|-- value: string (nullable = false)
Импорт функций и создание экземпляра вашей коллекции условий:
[...]
from pyspark.sql.functions import col, expr, when
conditions = [
{'column': 'col1', 'operator': '==', 'value': 3},
{'column': 'value', 'operator': '==', 'value': "'aa'"}
]
- С генерацией всего оператора if:
new_df = s_df.withColumn('value', expr(
f"if({conditions[0]['column']}{conditions[0]['operator']}{conditions[0]['value']}"
f" OR {conditions[1]['column']}{conditions[1]['operator']}{conditions[1]['value']},"
"value, 2)")).show()
- Или только с генерацией условия, переданного функции
when
.
new_df = s_df.withColumn('value',when(
expr(
f"{conditions[0]['column']}{conditions[0]['operator']}{conditions[0]['value']}"
f" OR {conditions[1]['column']}{conditions[1]['operator']}{conditions[1]['value']}"
),
col("value")).otherwise(2)).show()