PySpark новый столбец, который выбирает значение из списка целых чисел - PullRequest
1 голос
/ 13 июня 2019

У меня есть следующий фрейм данных в PySpark:

|ID    |YearBLT|MinYear|MaxYear|ADP_Range               |
---------------------------------------------------------
|164876|2010   |2004   |2009   |[2004,2009]             |
|164877|2008   |2000   |2011   |[2000, 2002, 2011]      |
|164878|2000   |2003   |2011   |[2003, 2011]            |
|164879|2013   |1999   |2015   |[2003, 2007, 2015, 1999]|

Где YearBLT - это год, когда было построено свойство, ADP_Range представляет годы, в которые были обновлены строительные нормы, а MinYear / MaxYear представляют минимальные и максимальные годы диапазона ADP.

Я пытаюсь добавить столбец (ADP_Year) с наиболее подходящим строительным кодом со следующей логикой:

  • Если YearBLT меньше MinYear, ADP_Year == "NA"
  • Если YearBLT больше, чем MaxYear, ADP_Year == Max (ADP_Range)
  • Если YearBLT находится между ними, он выбирает ближайшую дату ниже YearBLT в ADP_Range

Ожидаемый результат будет следующим:

|ID    |YearBLT|MinYear|MaxYear|ADP_Range               |ADP_Year|
------------------------------------------------------------------
|164876|2010   |2004   |2009   |[2004,2009]             |2009    |
|164877|2008   |2000   |2011   |[2000, 2002, 2011]      |2002    |
|164878|2000   |2003   |2011   |[2003, 2011]            |NA      |
|164879|2013   |1999   |2015   |[2003, 2007, 2015, 1999]|2007    |

2010> MaxYear, поэтому он выбирает значение из MaxYear,

2008 год - между 2000 и 2011 годами; поскольку существует третье значение 2002 года, оно выбрано более поздним, чем 2000

2000

2013 год - между 1999 и 2015 годами; поскольку есть третье и четвертое значения 2007 и 2015 годов, выбирается 2007

Первые два случая просты, и у меня есть рабочий код для них:

dfADP = dfADP.withColumn("ADP_Year",when(dfADP['YearBLT'] < dfADP['MinYear'], lit("NA")\
.when(dfADP['YearBLT'] > dfADP['MaxYear'],dfADP['MaxYear'])))

Я крутлю на этом свои колеса и буду рад некоторым советам, если это вообще возможно.

1 Ответ

2 голосов
/ 13 июня 2019

Сначала давайте найдем максимум диапазона

from pyspark.sql.functions import array_max, col, expr, when

max_adp_range = array_max("ADP_Range")

и ближайшее значение:

closest_adp_range = array_max(expr("""
    filter(ADP_Range, y -> y < YearBLT)
"""))

и скомбинируем эти два в одно выражение:

adp_year = when(
    # If the YearBLT is greater than the MaxYear, ADP_Year == Max(ADP_Range)
    col("YearBLT") > col("MaxYear"), max_adp_range
).when(
    # If the YearBLT is in between, it chooses 
    # the closest date below the YearBLT in the ADP_Range
    col("YearBLT").between(col("MinYear"), col("MaxYear")), closest_adp_range
).otherwise(
   # If the YearBLT is less than the MinYear, ADP_Year == "NA"
   # Note: not required. Included just for clarity.
   None
)

Наконец, выберите:

df = spark.createDataFrame([                                    
    (164876, 2010, 2004, 2009, [2004,2009]),
    (164877, 2008, 2000, 2011, [2000, 2002, 2011]),   
    (164878, 2000, 2003, 2011, [2003, 2011]),         
    (164879, 2013, 1999, 2015, [2003, 2007, 2015, 1999])
], ("id", "YearBLT", "MinYear", "MaxYear", "ADP_Range"))

df.withColumn("ADP_YEAR", adp_year).show()

, который должен дать ожидаемый результат:

+------+-------+-------+-------+--------------------+--------+
|    id|YearBLT|MinYear|MaxYear|           ADP_Range|ADP_YEAR|
+------+-------+-------+-------+--------------------+--------+
|164876|   2010|   2004|   2009|        [2004, 2009]|    2009|
|164877|   2008|   2000|   2011|  [2000, 2002, 2011]|    2002|
|164878|   2000|   2003|   2011|        [2003, 2011]|    null|
|164879|   2013|   1999|   2015|[2003, 2007, 2015...|    2007|
+------+-------+-------+-------+--------------------+--------+

Для функций array_max и filter высшего порядка требуется Spark 2.4 или более поздняя версия.В версии 2.3 или ранее вы могли переопределить приведенные выше выражения как

from pyspark.sql.functions import udf

max_adp_range = udf(max, "bigint")("ADP_Range")
closest_adp_range = udf(
    lambda xs, y: max(x for x in xs if x < y), "bigint"
)("ADP_Range", "YearBLT")

, но вы должны ожидать значительного снижения производительности (один udf должен быть быстрее, но все же медленнее, чем собственные выражения).

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...