Как присоединить метку времени в диапазоне (диапазон не существует) - PullRequest
0 голосов
/ 16 апреля 2020

Я хочу объединить два кадра данных по метке времени столбца df2.join(df1, how='left'). Следующий столбец отметки времени df1 является условием остановки

Данных для присоединения

df1 = spark.createDataFrame(
                        [(1,  110, 'walk',  'work',  '2019-09-28 13:40:00'),
                         (2,  110, 'metro', 'work',  '2019-09-28 14:00:00'),
                         (3,  110, 'walk',  'work',  '2019-09-28 14:02:00'),
                         (4,  120, 'bus',   'home',  '2019-09-28 17:00:00'),
                         (5,  120, 'metro', 'home',  '2019-09-28 17:20:00'),
                         (6,  120, 'walk',  'home',  '2019-09-28 17:45:00')],
                        ['id', 'u_uuid', 'mode', 'place', 'timestamp']
                        )

 df2 = spark.createDataFrame(
                        [(1,  '2019-09-28 13:30:00'),
                         (2,  '2019-09-28 13:35:00'),
                         (3,  '2019-09-28 13:39:00'),
                         (4,  '2019-09-28 13:50:00'),
                         (5,  '2019-09-28 13:55:00'),
                         (6,  '2019-09-28 14:01:00'),
                         (7,  '2019-09-28 16:30:00'),
                         (8,  '2019-09-28 16:40:00'),
                         (9,  '2019-09-28 16:50:00'),
                         (10, '2019-09-28 17:25:00'),
                         (11, '2019-09-28 17:30:00'),
                         (12, '2019-09-28 17:35:00')],
                         ['id', 'timestamp']
                        )

Цель

enter image description here

1 Ответ

1 голос
/ 16 апреля 2020

IIU C, Один из способов сделать это - использовать Window.

import pyspark.sql.functions as f
from pyspark.sql.window import Window
win_spec = Window.orderBy('timestamp')

# Window function without partitionBy has huge impact as it will bring all data into one partition. You might see executor OOM errors.

# Advise to add some partition column if you have big dataset
Window.partitionBy('SOME_COL').orderBy('timestamp')

Теперь добавьте столбец start_timestamp, как показано ниже

df = df1.withColumn('start_timestamp', f.coalesce(f.lag('timestamp').over(win_spec),f.lit('1')))
# df.show()
# +---+------+-----+-----+-------------------+-------------------+
# | id|u_uuid| mode|place|          timestamp|    start_timestamp|
# +---+------+-----+-----+-------------------+-------------------+
# |  1|   110| walk| work|2019-09-28 13:40:00|                  1|
# |  2|   110|metro| work|2019-09-28 14:00:00|2019-09-28 13:40:00|
# |  3|   110| walk| work|2019-09-28 14:02:00|2019-09-28 14:00:00|
# |  4|   120|  bus| home|2019-09-28 17:00:00|2019-09-28 14:02:00|
# |  5|   120|metro| home|2019-09-28 17:20:00|2019-09-28 17:00:00|
# |  6|   120| walk| home|2019-09-28 17:45:00|2019-09-28 17:20:00|
# +---+------+-----+-----+-------------------+-------------------+

Присоединяйтесь df с df2 Используя left присоединяйтесь

df.join(df2, df2['timestamp'].between(df['start_timestamp'], df['timestamp']), 'left')\
   .where(df2['id'].isNotNull())\ # check below
   .select(df['u_uuid'], df['mode'], df['place'], df['timestamp'].alias('df1.timestamp'), df2['timestamp'].alias('df2.timestamp'))\
   .show()

# where clause is just to match goal output,
# there is no entry in df2 for 2019-09-28 17:00:00 to 2019-09-28 17:20:00 range
# Record: 120|metro| home|2019-09-28 17:20:00|2019-09-28 17:00:00
+------+-----+-----+-------------------+-------------------+
|u_uuid| mode|place|      df1.timestamp|      df2.timestamp|
+------+-----+-----+-------------------+-------------------+
|   110| walk| work|2019-09-28 13:40:00|2019-09-28 13:30:00|
|   110| walk| work|2019-09-28 13:40:00|2019-09-28 13:35:00|
|   110| walk| work|2019-09-28 13:40:00|2019-09-28 13:39:00|
|   110|metro| work|2019-09-28 14:00:00|2019-09-28 13:50:00|
|   110|metro| work|2019-09-28 14:00:00|2019-09-28 13:55:00|
|   110| walk| work|2019-09-28 14:02:00|2019-09-28 14:01:00|
|   120|  bus| home|2019-09-28 17:00:00|2019-09-28 16:30:00|
|   120|  bus| home|2019-09-28 17:00:00|2019-09-28 16:40:00|
|   120|  bus| home|2019-09-28 17:00:00|2019-09-28 16:50:00|
|   120| walk| home|2019-09-28 17:45:00|2019-09-28 17:25:00|
|   120| walk| home|2019-09-28 17:45:00|2019-09-28 17:30:00|
|   120| walk| home|2019-09-28 17:45:00|2019-09-28 17:35:00|
+------+-----+-----+-------------------+-------------------+

В качестве альтернативы , вы можете использовать right присоединиться, чтобы избежать, где Cluase. Решить на основе df1 и df2 размера.

df.join(df2, df2['timestamp'].between(df['start_timestamp'], df['timestamp']), 'right')\
   .select(df['u_uuid'], df['mode'], df['place'], df['timestamp'].alias('df1.timestamp'), df2['timestamp'].alias('df2.timestamp'))\
   .show()
...