Присоединение к фреймам данных Pyspark с использованием сложной условной логики c (возможно, с использованием карты) - PullRequest
1 голос
/ 10 апреля 2020

Добрый день.

Я пытаюсь выполнить объединение в Pyspark, который использует сложный набор условий для получения одного значения.

Минимальный пример того, чего я пытаюсь достичь может выглядеть следующим образом. Представьте себе набор событий, которые могут происходить в разное время (от t=0 до t=40). Каждое событие имеет набор из трех независимых логических свойств, которые описывают природу события. Существует некоторое зависящее от времени значение, связанное с срабатыванием каждого свойства, которое содержится в таблице поиска. Для каждого события я хотел бы определить сумму всех соответствующих значений для этого события.

Мой первый кадр данных, df_1, представляет собой список событий, время, когда событие произошло, и с ним был связан выбор логических свойств:

+-------------+------------+------------+------------+------------+
| EVENT_INDEX | EVENT_TIME | PROPERTY_1 | PROPERTY_2 | PROPERTY_3 |
+-------------+------------+------------+------------+------------+
|   Event_1   |     13     |     1      |      0     |     1      |
|   Event_2   |     24     |     0      |      1     |     1      |
|   Event_3   |     35     |     1      |      0     |     0      |
+-------------+------------+------------+------------+------------+

Второй фрейм данных, df_2, представляет собой таблицу поиска, которая описывает соответствующее значение, имеющее значение ИСТИНА для определенного свойства в конкретное время. Поскольку существует много повторяющихся значений во всех временных интервалах, формат этого информационного кадра является включающим диапазоном времен, для которого свойство имеет значение c. Временные диапазоны имеют непостоянный размер и могут сильно различаться между различными свойствами:

+------------+----------+---------------+-------+
| START_TIME | END_TIME | PROPERTY_NAME | VALUE |
+------------+----------+---------------+-------+
|      0     |    18    |  PROPERTY_1   |  0.1  |
|     19     |    40    |  PROPERTY_1   |  0.8  |
|      0     |    20    |  PROPERTY_2   |  0.7  |
|     20     |    24    |  PROPERTY_2   |  0.3  |
|     25     |    40    |  PROPERTY_2   |  0.7  |
|      0     |    40    |  PROPERTY_3   |  0.5  |
+------------+----------+---------------+-------+

Желаемый результат: поскольку Event_1 произошло в момент времени t=13, при срабатывании PROPERTY_1 и PROPERTY_3 ожидаемый сумма значений в соответствии с df_2 должна составлять 0,1 (из сегмента PROPERTY_1 0-18) + 0,5 (из сегмента PROPERTY_3 0-40) = 0,6. Точно так же, Event_2 должно иметь значение 0,3 (помните, что время начала / окончания сегмента включительно, так что это 20–20 сегментов) + 0,5 = 0,8. Наконец, Event_3 = 0,8.

+-------------+------------+------------+------------+------------+-------------+
| EVENT_INDEX | EVENT_TIME | PROPERTY_1 | PROPERTY_2 | PROPERTY_3 | TOTAL_VALUE |
+-------------+------------+------------+------------+------------+-------------+
|   Event_1   |     13     |     1      |      0     |     1      |     0.6     |
|   Event_2   |     24     |     0      |      1     |     1      |     0.8     |
|   Event_3   |     35     |     1      |      0     |     0      |     0.8     |
+-------------+------------+------------+------------+------------+-------------+

Для моего начального тестового набора данных в фрейме данных событий df_1 есть ~ 20 000 событий, распределенных по 2000 временным сегментам. Каждое событие имеет ~ 44 свойства, а длина справочной таблицы df_2 составляет ~ 53 000. Поскольку я хотел бы расширить этот процесс до значительно большего количества данных (возможно, на несколько порядков), я очень заинтересован в параллельном решении этой проблемы. Например, я чувствую, что хочу обобщить df_2 в виде словаря python и транслировать, что моим исполнителям будет невозможно, учитывая объем данных.

Поскольку я пытаюсь добавить один столбец к каждому В строке df_1 я попытался выполнить sh задачу, используя вложенную карту, которая выглядит примерно так:

def calculate_value(df_2):
    def _calculate_value(row):
        row_dict = row.asDict()
        rolling_value = 0.0
        for property_name in [key for key in row_dict.keys() if "PROPERTY" in key]:
            additional_value = (
                df_2
                .filter(
                    (pyspark.sql.functions.col("PROPERTY_NAME") == property_name)
                    & (pyspark.sql.functions.col("START_BUCKET") <= row_dict["EVENT_TIME"])
                    & (pyspark.sql.functions.col("END_BUCKET") >= row_dict["EVENT_TIME"])
                )
                .select("VALUE")
                .collect()
            )[0][0]
            rolling_value += additional_value
        return pyspark.sql.Row(**row_dict)
    return _calculate_value

Этот код может выполнить объединение для драйвера (запустив calculate_value(df_2)(df_1.rdd.take(1)[0])), однако, когда я пытаюсь выполнить распараллеленную карту:

(
    df_1
    .rdd
    .map(calculate_value(df_2))
)

Я получаю Py4JError, указывающую, что он не может отделить объект dataframe df_2. Это проверено в другом месте в StackOverflow, например, Pyspark: PicklingError: Не удалось сериализовать объект: .

Я решил использовать карту, а не объединение, потому что я добавляю один столбец к каждой строке в df_1, и учитывая сложность в кодировании сложных логик c, необходимых для определения правильных строк в df_2 для суммирования для каждого данного события (сначала проверьте, какие свойства сработали и были ИСТИНА в df_1, затем выберите эти свойства в df_2, отмените выбор только для тех свойств и значений, которые соответствуют данному времени события, а затем сложите все события).

Я пытаюсь придумать способ перенастроить df_2 устойчивым, масштабируемым способом, позволяющим более простое объединение / карту, но я не уверен, как лучше go сделать это.

Любой совет будет принят с благодарностью.

1 Ответ

0 голосов
/ 11 апреля 2020

Sample DataFrames:

df1.show()
+-----------+----------+----------+----------+----------+
|EVENT_INDEX|EVENT_TIME|PROPERTY_1|PROPERTY_2|PROPERTY_3|
+-----------+----------+----------+----------+----------+
|    Event_1|        13|         1|         0|         1|
|    Event_2|        24|         0|         1|         1|
|    Event_3|        35|         1|         0|         0|
+-----------+----------+----------+----------+----------+

df2.show()
+----------+--------+-------------+-----+
|START_TIME|END_TIME|PROPERTY_NAME|VALUE|
+----------+--------+-------------+-----+
|         0|      18|   PROPERTY_1|  0.1|
|        19|      40|   PROPERTY_1|  0.8|
|         0|      20|   PROPERTY_2|  0.7|
|        20|      24|   PROPERTY_2|  0.3|
|        25|      40|   PROPERTY_2|  0.7|
|         0|      40|   PROPERTY_3|  0.5|
+----------+--------+-------------+-----+

Это работает для Spark2.4+ с использованием DataframeAPI. (очень масштабируемый поскольку он использует только встроенные функции и является динамическим c для стольких столбцов свойств)

Он будет работать для столько же свойств , сколько динамических c для них, если столбцы свойств начинаются с 'PROPERTY_'. Сначала я буду использовать arrays_zip и array и explode, чтобы свернуть все столбцы свойств в строки с двумя столбцами, используя element_at чтобы дать нам PROPERY_NAME,PROPERTY_VALUE. До join я буду фильтровать, чтобы сохранить только те строки, где PROPERY_VALUE=1. Объединение будет происходить в range of time и где PROPERTY (со всеми свернутыми строками свойств) = PROPERTY_NAMES ( из df2 ). Это гарантирует, что мы получим только все строки, необходимые для нашей суммы. Затем я выполняю groupBy с agg, чтобы выбрать все наши необходимые столбцы и получить нашу общую сумму как TOTAL_VALUE.

from pyspark.sql import functions as F
df1.withColumn("PROPERTIES",\
F.explode(F.arrays_zip(F.array([F.array(F.lit(x),F.col(x)) for x in df1.columns if x.startswith("PROPERTY_")]))))\
.select("EVENT_INDEX", "EVENT_TIME","PROPERTIES.*",\
       *[x for x in df1.columns if x.startswith("PROPERTY_")]).withColumn("PROPERTY", F.element_at("0",1))\
                                                    .withColumn("PROPERTY_VALUE", F.element_at("0",2)).drop("0")\
.filter('PROPERTY_VALUE=1').join(df2, (df1.EVENT_TIME>=df2.START_TIME) & (df1.EVENT_TIME<=df2.END_TIME)& \
(F.col("PROPERTY")==df2.PROPERTY_NAME)).groupBy("EVENT_INDEX").agg(F.first("EVENT_TIME").alias("EVENT_TIME"),\
*[F.first(x).alias(x) for x in df1.columns if x.startswith("PROPERTY_")],\
(F.sum("VALUE").alias("TOTAL_VALUE"))).orderBy("EVENT_TIME").show()

+-----------+----------+----------+----------+----------+-----------+
|EVENT_INDEX|EVENT_TIME|PROPERTY_1|PROPERTY_2|PROPERTY_3|TOTAL_VALUE|
+-----------+----------+----------+----------+----------+-----------+
|    Event_1|        13|         1|         0|         1|        0.6|
|    Event_2|        24|         0|         1|         1|        0.8|
|    Event_3|        35|         1|         0|         0|        0.8|
+-----------+----------+----------+----------+----------+-----------+
...