Присоединение Spark DataFrames к ближайшему ключевому условию - PullRequest
5 голосов
/ 24 сентября 2019

Какой эффективный способ сделать нечеткие объединения в PySpark?

Я ищу мнения сообщества о масштабируемом подходе к объединению больших фреймов данных Spark при ближайшем ключевом условии.Позвольте мне проиллюстрировать эту проблему на репрезентативном примере.Предположим, у нас есть следующий Spark DataFrame, содержащий события, происходящие в определенный момент времени:

ddf_event = spark.createDataFrame(
    data=[
        [1, 'A'],
        [5, 'A'],
        [10, 'B'],
        [15, 'A'],
        [20, 'B'],
        [25, 'B'],
        [30, 'A']
    ],
    schema=['ts_event', 'event']
)

, и следующий Spark DataFrame, содержащий GPS-данные, измеренные в определенный момент времени:

ddf_gps = spark.createDataFrame(
    data=[
        [2, '(-46.84635, 173.13674)'],
        [4, '(2.50362, 104.34136)'],
        [8, '(-24.20741, 51.80755)'],
        [15, '(-59.07798, -20.49141)'],
        [18, '(-44.34468, -167.90401)'],
        [24, '(-18.84175, 16.68628)'],
        [27, '(20.48501,58.42423)']
    ],
    schema=['ts_gps', 'gps_coordinates']
)

, которыймы хотели бы присоединиться, чтобы создать следующий результирующий кадр данных:

+--------+-----+------+-----------------------+
|ts_event|event|ts_gps|gps_coordinates        |
+--------+-----+------+-----------------------+
|1       |A    |2     |(-46.84635, 173.13674) |
|5       |A    |4     |(2.50362, 104.34136)   |
|10      |B    |8     |(-24.20741, 51.80755)  |
|15      |A    |15    |(-59.07798, -20.49141) |
|20      |B    |18    |(-44.34468, -167.90401)|
|25      |B    |24    |(-18.84175, 16.68628)  |
|30      |A    |27    |(20.48501,58.42423)    |
+--------+-----+------+-----------------------+

, эффективно находящий ближайшую точку данных GPS с учетом метки времени события и метки времени данных GPS.

Таким образом, мы сталкиваемся с проблемойобъединение по ближайшему ключевому условию, в котором «ближайший» в данном случае определяется как наименьшая абсолютная разница между временными метками.

Я исследовал два подхода для достижения этой цели: один на основе отфильтрованного двоичного объединения (FBJ) иодин на основе отфильтрованного отсортированного объединения (FSU).Оба подхода более подробно описаны ниже.

Подход FBJ зависит от параметра bin_size, который ограничивает временное окно, в котором может быть найдена соответствующая временная метка GPS.Увеличение bin_size увеличивает вычислительную нагрузку, а уменьшение снижает качество результата.

Оба подхода не масштабируются линейно с размером входных кадров данных.

На практике мне приходитсяЯ имею дело с входными данными, состоящими из десятков миллионов строк, поэтому я в настоящее время теряюсь для жизнеспособного решения проблемы.

Подход FBJ

Подход FBJ состоит из следующих шагов:

  1. Создание столбца ts_bin, объединяющего столбцы timestamp, реализуется с помощью:
bin_size = 10
ddf_event = ddf_event.withColumn(
    'ts_bin',
    F.round(F.col('ts_event') / bin_size)
)

ddf_gps = ddf_gps.withColumn(
    'ts_bin',
    F.round(F.col('ts_gps') / bin_size)
)
Присоединение к фреймам данных в столбце ts_bin, реализуемое:
ddf = ddf_event.join(ddf_gps, 'ts_bin', 'left_outer')
Определите минимальную разницу меток времени, реализованную с помощью:
from pyspark.sql.window import Window

window = Window.partitionBy('ts_event')

ddf = ddf.withColumn(
    'ts_diff',
    F.abs(F.col('ts_gps') - F.col('ts_event'))
)

ddf = ddf.withColumn(
    'min_ts_diff',
    F.min(F.col('ts_diff')).over(window)
)
Отфильтруйте и выберите соответствующие строки и столбцы, используя:
ddf = (
    ddf
    .where(
        (F.col('ts_diff') == F.col('min_ts_diff')) |
        (F.col('ts_diff').isNull())   
    )
    .select(
        'ts_event',
        'event',
        'ts_gps',
        'gps_coordinates'
    )
)

Limit bin_size ситуаций:

  • bin_size >> 1 эффективно приводит к полномуперекрестное объединение
  • bin_size = 1 эффективно приводит к левому объединению ts_event == ts_gps

подход FSU

подход FSU состоит из следующих этапов:

  1. Объединение фреймов данных, реализованное с помощью:
def union(df1, df2):
    cols = list(set(df1.columns).union(set(df2.columns)))
    for col in cols:
        if col not in df1.columns:
            df1 = df1.withColumn(col, F.lit(None))
        if col not in df2.columns:
            df2 = df2.withColumn(col, F.lit(None))
    return df1.select(cols).union(df2.select(cols))

ddf_event = ddf_event.withColumn('timestamp', F.col('ts_event'))
ddf_gps = ddf_gps.withColumn('timestamp', F.col('ts_gps'))
ddf = union(ddf_event, ddf_gps)
Сортировка полученного DataFrame и получение временных меток GPS, реализованных с помощью:
from sys import maxsize

last_window = Window.orderBy(
    F.col('timestamp').asc()).rowsBetween(-maxsize, 0)
first_window = Window.orderBy(
    F.col('timestamp').asc()).rowsBetween(0, maxsize)

ddf = (
    ddf.withColumn(
        'prev_time',
        F.last(F.col('ts_gps'), ignorenulls=True)
         .over(last_window)
    ).withColumn(
        'prev_coordinates',
        F.last(F.col('gps_coordinates'), ignorenulls=True)
         .over(last_window)
    ).withColumn(
        'next_time',
        F.first(F.col('ts_gps'), ignorenulls=True)
         .over(first_window)
    ).withColumn(
        'next_coordinates',
        F.first(F.col('gps_coordinates'), ignorenulls=True)
         .over(first_window)
    )
)
Отфильтруйте и выберите соответствующие строки и столбцы, используя:
condition = (F.col('timestamp') - F.col('prev_time')
             < F.col('next_time') - F.col('timestamp'))

ddf = (
    ddf
    .where(F.col('event').isNotNull())
    .withColumn(
        'ts_gps',
        F.when(condition | F.col('next_time').isNull(), F.col('prev_time')).otherwise(F.col('next_time'))
    ).withColumn(
        'gps_coordinates',
        F.when(condition | F.col('next_time').isNull(),
               F.col('prev_coordinates'))
         .otherwise(F.col('next_coordinates'))
    ).select(
        'ts_event',
        'event',
        'ts_gps',
        'gps_coordinates'
    )
)

1 Ответ

2 голосов
/ 24 сентября 2019

То, что вы ищете, это временное соединение .Проверьте библиотеку Spark временных рядов Flint (ранее HuoHua, Spark на китайском языке): https://github.com/twosigma/flint

Используя эту библиотеку, для 2 заданных временных рядов временных рядов (документация объясняет эти объекты), вы можете выполнить в PySpark (или Scala Spark):

ddf_event = ...
ddf_gps = ...
result = ddf_event.leftJoin(ddf_gps, tolerance = "1day")

Ваши метки времени не были четкими, поэтому установите допуск в соответствии с вашими потребностями.Вы также можете сделать «будущие объединения», если это необходимо.

Ознакомьтесь с их презентацией Spark Summit для получения дополнительных объяснений и примеров: https://youtu.be/g8o5-2lLcvQ

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...