Добрый день.
Я пытаюсь выполнить объединение в Pyspark, который использует сложный набор условий для получения одного значения.
Минимальный пример того, чего я пытаюсь достичь может выглядеть следующим образом. Представьте себе набор событий, которые могут происходить в разное время (от t=0
до t=40
). Каждое событие имеет набор из трех независимых логических свойств, которые описывают природу события. Существует некоторое зависящее от времени значение, связанное с срабатыванием каждого свойства, которое содержится в таблице поиска. Для каждого события я хотел бы определить сумму всех соответствующих значений для этого события.
Мой первый кадр данных, df_1
, представляет собой список событий, время, когда событие произошло, и с ним был связан выбор логических свойств:
+-------------+------------+------------+------------+------------+
| EVENT_INDEX | EVENT_TIME | PROPERTY_1 | PROPERTY_2 | PROPERTY_3 |
+-------------+------------+------------+------------+------------+
| Event_1 | 13 | 1 | 0 | 1 |
| Event_2 | 24 | 0 | 1 | 1 |
| Event_3 | 35 | 1 | 0 | 0 |
+-------------+------------+------------+------------+------------+
Второй фрейм данных, df_2
, представляет собой таблицу поиска, которая описывает соответствующее значение, имеющее значение ИСТИНА для определенного свойства в конкретное время. Поскольку существует много повторяющихся значений во всех временных интервалах, формат этого информационного кадра является включающим диапазоном времен, для которого свойство имеет значение c. Временные диапазоны имеют непостоянный размер и могут сильно различаться между различными свойствами:
+------------+----------+---------------+-------+
| START_TIME | END_TIME | PROPERTY_NAME | VALUE |
+------------+----------+---------------+-------+
| 0 | 18 | PROPERTY_1 | 0.1 |
| 19 | 40 | PROPERTY_1 | 0.8 |
| 0 | 20 | PROPERTY_2 | 0.7 |
| 20 | 24 | PROPERTY_2 | 0.3 |
| 25 | 40 | PROPERTY_2 | 0.7 |
| 0 | 40 | PROPERTY_3 | 0.5 |
+------------+----------+---------------+-------+
Желаемый результат: поскольку Event_1
произошло в момент времени t=13
, при срабатывании PROPERTY_1
и PROPERTY_3
ожидаемый сумма значений в соответствии с df_2
должна составлять 0,1 (из сегмента PROPERTY_1
0-18) + 0,5 (из сегмента PROPERTY_3
0-40) = 0,6. Точно так же, Event_2
должно иметь значение 0,3 (помните, что время начала / окончания сегмента включительно, так что это 20–20 сегментов) + 0,5 = 0,8. Наконец, Event_3
= 0,8.
+-------------+------------+------------+------------+------------+-------------+
| EVENT_INDEX | EVENT_TIME | PROPERTY_1 | PROPERTY_2 | PROPERTY_3 | TOTAL_VALUE |
+-------------+------------+------------+------------+------------+-------------+
| Event_1 | 13 | 1 | 0 | 1 | 0.6 |
| Event_2 | 24 | 0 | 1 | 1 | 0.8 |
| Event_3 | 35 | 1 | 0 | 0 | 0.8 |
+-------------+------------+------------+------------+------------+-------------+
Для моего начального тестового набора данных в фрейме данных событий df_1
есть ~ 20 000 событий, распределенных по 2000 временным сегментам. Каждое событие имеет ~ 44 свойства, а длина справочной таблицы df_2
составляет ~ 53 000. Поскольку я хотел бы расширить этот процесс до значительно большего количества данных (возможно, на несколько порядков), я очень заинтересован в параллельном решении этой проблемы. Например, я чувствую, что хочу обобщить df_2
в виде словаря python и транслировать, что моим исполнителям будет невозможно, учитывая объем данных.
Поскольку я пытаюсь добавить один столбец к каждому В строке df_1
я попытался выполнить sh задачу, используя вложенную карту, которая выглядит примерно так:
def calculate_value(df_2):
def _calculate_value(row):
row_dict = row.asDict()
rolling_value = 0.0
for property_name in [key for key in row_dict.keys() if "PROPERTY" in key]:
additional_value = (
df_2
.filter(
(pyspark.sql.functions.col("PROPERTY_NAME") == property_name)
& (pyspark.sql.functions.col("START_BUCKET") <= row_dict["EVENT_TIME"])
& (pyspark.sql.functions.col("END_BUCKET") >= row_dict["EVENT_TIME"])
)
.select("VALUE")
.collect()
)[0][0]
rolling_value += additional_value
return pyspark.sql.Row(**row_dict)
return _calculate_value
Этот код может выполнить объединение для драйвера (запустив calculate_value(df_2)(df_1.rdd.take(1)[0])
), однако, когда я пытаюсь выполнить распараллеленную карту:
(
df_1
.rdd
.map(calculate_value(df_2))
)
Я получаю Py4JError, указывающую, что он не может отделить объект dataframe df_2
. Это проверено в другом месте в StackOverflow, например, Pyspark: PicklingError: Не удалось сериализовать объект: .
Я решил использовать карту, а не объединение, потому что я добавляю один столбец к каждой строке в df_1
, и учитывая сложность в кодировании сложных логик c, необходимых для определения правильных строк в df_2
для суммирования для каждого данного события (сначала проверьте, какие свойства сработали и были ИСТИНА в df_1
, затем выберите эти свойства в df_2
, отмените выбор только для тех свойств и значений, которые соответствуют данному времени события, а затем сложите все события).
Я пытаюсь придумать способ перенастроить df_2
устойчивым, масштабируемым способом, позволяющим более простое объединение / карту, но я не уверен, как лучше go сделать это.
Любой совет будет принят с благодарностью.