Сначала давайте найдем максимум диапазона
from pyspark.sql.functions import array_max, col, expr, when
max_adp_range = array_max("ADP_Range")
и ближайшее значение:
closest_adp_range = array_max(expr("""
filter(ADP_Range, y -> y < YearBLT)
"""))
и скомбинируем эти два в одно выражение:
adp_year = when(
# If the YearBLT is greater than the MaxYear, ADP_Year == Max(ADP_Range)
col("YearBLT") > col("MaxYear"), max_adp_range
).when(
# If the YearBLT is in between, it chooses
# the closest date below the YearBLT in the ADP_Range
col("YearBLT").between(col("MinYear"), col("MaxYear")), closest_adp_range
).otherwise(
# If the YearBLT is less than the MinYear, ADP_Year == "NA"
# Note: not required. Included just for clarity.
None
)
Наконец, выберите:
df = spark.createDataFrame([
(164876, 2010, 2004, 2009, [2004,2009]),
(164877, 2008, 2000, 2011, [2000, 2002, 2011]),
(164878, 2000, 2003, 2011, [2003, 2011]),
(164879, 2013, 1999, 2015, [2003, 2007, 2015, 1999])
], ("id", "YearBLT", "MinYear", "MaxYear", "ADP_Range"))
df.withColumn("ADP_YEAR", adp_year).show()
, который должен дать ожидаемый результат:
+------+-------+-------+-------+--------------------+--------+
| id|YearBLT|MinYear|MaxYear| ADP_Range|ADP_YEAR|
+------+-------+-------+-------+--------------------+--------+
|164876| 2010| 2004| 2009| [2004, 2009]| 2009|
|164877| 2008| 2000| 2011| [2000, 2002, 2011]| 2002|
|164878| 2000| 2003| 2011| [2003, 2011]| null|
|164879| 2013| 1999| 2015|[2003, 2007, 2015...| 2007|
+------+-------+-------+-------+--------------------+--------+
Для функций array_max
и filter
высшего порядка требуется Spark 2.4 или более поздняя версия.В версии 2.3 или ранее вы могли переопределить приведенные выше выражения как
from pyspark.sql.functions import udf
max_adp_range = udf(max, "bigint")("ADP_Range")
closest_adp_range = udf(
lambda xs, y: max(x for x in xs if x < y), "bigint"
)("ADP_Range", "YearBLT")
, но вы должны ожидать значительного снижения производительности (один udf
должен быть быстрее, но все же медленнее, чем собственные выражения).