С воспроизводимым примером
import pyspark.sql.functions as F
from pyspark.sql.types import StructType, StructField, StringType
data = [(1, 1, 0, 0, 2, 0), (0, 0, 0, 2, 0, 1), (0, 0, 1, 0, 0, 1), (2, 0, 1, 0, 0, 0), (0, 0, 0, 0, 0, 0)]
schema = StructType([
StructField("Fib_12", StringType()),
StructField("Fib_13", StringType()),
StructField("Fib_14", StringType()),
StructField("Fib_15", StringType()),
StructField("Fib_16", StringType()),
StructField("Fib_17", StringType()),
])
df = spark.createDataFrame(data, schema=schema)
df
# DataFrame[Fib_12: string, Fib_13: string, Fib_14: string, Fib_15: string, Fib_16: string, Fib_17: string]
Решение с объяснением.
В опубликованном вами примере DataFrame были имена столбцов, такие как FibNN
, тогда как для сопоставления используется строковый шаблон fib_
. Таким образом, я изменил пример данных, чтобы иметь имена столбцов, такие как Fib_NN
и использовать re.match
и шаблон регулярного выражения (?i)fib_
, который не чувствителен к регистру. re.match
возврат True
для положительных регулярных выражений. (?i)
делает строковый шаблон для сопоставления без учета регистра. Если вы все еще получаете значения NULL
, вероятным виновником является то, что все ваши столбцы Fib_NN
содержат значение NULL
.
from pyspark.sql.functions import col, greatest
import re
select_cols = [col(x).cast('long').alias(x) for x in df.columns if re.match('(?i)fib_', x)]
collect_cols = [col(x) for x in df.columns if re.match('(?i)fib_', x)]
result = (
df.
select(*select_cols).
withColumn(
'Fib_max',
greatest(*collect_cols)
)
)
result.show(truncate=False)
+------+------+------+------+------+------+-------+
|Fib_12|Fib_13|Fib_14|Fib_15|Fib_16|Fib_17|Fib_max|
+------+------+------+------+------+------+-------+
|1 |1 |0 |0 |2 |0 |2 |
|0 |0 |0 |2 |0 |1 |2 |
|0 |0 |1 |0 |0 |1 |1 |
|2 |0 |1 |0 |0 |0 |2 |
|0 |0 |0 |0 |0 |0 |0 |
+------+------+------+------+------+------+-------+