Какой эффективный способ сделать нечеткие объединения в PySpark?
Я ищу мнения сообщества о масштабируемом подходе к объединению больших фреймов данных Spark при ближайшем ключевом условии.Позвольте мне проиллюстрировать эту проблему на репрезентативном примере.Предположим, у нас есть следующий Spark DataFrame, содержащий события, происходящие в определенный момент времени:
ddf_event = spark.createDataFrame(
data=[
[1, 'A'],
[5, 'A'],
[10, 'B'],
[15, 'A'],
[20, 'B'],
[25, 'B'],
[30, 'A']
],
schema=['ts_event', 'event']
)
, и следующий Spark DataFrame, содержащий GPS-данные, измеренные в определенный момент времени:
ddf_gps = spark.createDataFrame(
data=[
[2, '(-46.84635, 173.13674)'],
[4, '(2.50362, 104.34136)'],
[8, '(-24.20741, 51.80755)'],
[15, '(-59.07798, -20.49141)'],
[18, '(-44.34468, -167.90401)'],
[24, '(-18.84175, 16.68628)'],
[27, '(20.48501,58.42423)']
],
schema=['ts_gps', 'gps_coordinates']
)
, которыймы хотели бы присоединиться, чтобы создать следующий результирующий кадр данных:
+--------+-----+------+-----------------------+
|ts_event|event|ts_gps|gps_coordinates |
+--------+-----+------+-----------------------+
|1 |A |2 |(-46.84635, 173.13674) |
|5 |A |4 |(2.50362, 104.34136) |
|10 |B |8 |(-24.20741, 51.80755) |
|15 |A |15 |(-59.07798, -20.49141) |
|20 |B |18 |(-44.34468, -167.90401)|
|25 |B |24 |(-18.84175, 16.68628) |
|30 |A |27 |(20.48501,58.42423) |
+--------+-----+------+-----------------------+
, эффективно находящий ближайшую точку данных GPS с учетом метки времени события и метки времени данных GPS.
Таким образом, мы сталкиваемся с проблемойобъединение по ближайшему ключевому условию, в котором «ближайший» в данном случае определяется как наименьшая абсолютная разница между временными метками.
Я исследовал два подхода для достижения этой цели: один на основе отфильтрованного двоичного объединения (FBJ) иодин на основе отфильтрованного отсортированного объединения (FSU).Оба подхода более подробно описаны ниже.
Подход FBJ зависит от параметра bin_size
, который ограничивает временное окно, в котором может быть найдена соответствующая временная метка GPS.Увеличение bin_size
увеличивает вычислительную нагрузку, а уменьшение снижает качество результата.
Оба подхода не масштабируются линейно с размером входных кадров данных.
На практике мне приходитсяЯ имею дело с входными данными, состоящими из десятков миллионов строк, поэтому я в настоящее время теряюсь для жизнеспособного решения проблемы.
Подход FBJ
Подход FBJ состоит из следующих шагов:
- Создание столбца
ts_bin
, объединяющего столбцы timestamp
, реализуется с помощью:
bin_size = 10
ddf_event = ddf_event.withColumn(
'ts_bin',
F.round(F.col('ts_event') / bin_size)
)
ddf_gps = ddf_gps.withColumn(
'ts_bin',
F.round(F.col('ts_gps') / bin_size)
)
Присоединение к фреймам данных в столбце
ts_bin
, реализуемое:
ddf = ddf_event.join(ddf_gps, 'ts_bin', 'left_outer')
Определите минимальную разницу меток времени, реализованную с помощью:
from pyspark.sql.window import Window
window = Window.partitionBy('ts_event')
ddf = ddf.withColumn(
'ts_diff',
F.abs(F.col('ts_gps') - F.col('ts_event'))
)
ddf = ddf.withColumn(
'min_ts_diff',
F.min(F.col('ts_diff')).over(window)
)
Отфильтруйте и выберите соответствующие строки и столбцы, используя:
ddf = (
ddf
.where(
(F.col('ts_diff') == F.col('min_ts_diff')) |
(F.col('ts_diff').isNull())
)
.select(
'ts_event',
'event',
'ts_gps',
'gps_coordinates'
)
)
Limit bin_size
ситуаций:
bin_size >> 1
эффективно приводит к полномуперекрестное объединение bin_size = 1
эффективно приводит к левому объединению ts_event == ts_gps
подход FSU
подход FSU состоит из следующих этапов:
- Объединение фреймов данных, реализованное с помощью:
def union(df1, df2):
cols = list(set(df1.columns).union(set(df2.columns)))
for col in cols:
if col not in df1.columns:
df1 = df1.withColumn(col, F.lit(None))
if col not in df2.columns:
df2 = df2.withColumn(col, F.lit(None))
return df1.select(cols).union(df2.select(cols))
ddf_event = ddf_event.withColumn('timestamp', F.col('ts_event'))
ddf_gps = ddf_gps.withColumn('timestamp', F.col('ts_gps'))
ddf = union(ddf_event, ddf_gps)
Сортировка полученного DataFrame и получение временных меток GPS, реализованных с помощью:
from sys import maxsize
last_window = Window.orderBy(
F.col('timestamp').asc()).rowsBetween(-maxsize, 0)
first_window = Window.orderBy(
F.col('timestamp').asc()).rowsBetween(0, maxsize)
ddf = (
ddf.withColumn(
'prev_time',
F.last(F.col('ts_gps'), ignorenulls=True)
.over(last_window)
).withColumn(
'prev_coordinates',
F.last(F.col('gps_coordinates'), ignorenulls=True)
.over(last_window)
).withColumn(
'next_time',
F.first(F.col('ts_gps'), ignorenulls=True)
.over(first_window)
).withColumn(
'next_coordinates',
F.first(F.col('gps_coordinates'), ignorenulls=True)
.over(first_window)
)
)
Отфильтруйте и выберите соответствующие строки и столбцы, используя:
condition = (F.col('timestamp') - F.col('prev_time')
< F.col('next_time') - F.col('timestamp'))
ddf = (
ddf
.where(F.col('event').isNotNull())
.withColumn(
'ts_gps',
F.when(condition | F.col('next_time').isNull(), F.col('prev_time')).otherwise(F.col('next_time'))
).withColumn(
'gps_coordinates',
F.when(condition | F.col('next_time').isNull(),
F.col('prev_coordinates'))
.otherwise(F.col('next_coordinates'))
).select(
'ts_event',
'event',
'ts_gps',
'gps_coordinates'
)
)