PySpark: фильтрация данных на основе диапазона дат из другого фрейма данных - PullRequest
0 голосов
/ 29 марта 2019

Я пытаюсь выбрать записи из df1 , если df1.date1 лежит между df2.date2 и df2.date3 (разрешены только три диапазона даты2, комбинации даты3, по строкам).

В моем случае нет единой переменной для определения критерия соединения.Я пробовал разные функции pyspark.sql, такие как 'filter', 'when', 'withColumn', 'date_sub', 'date_add' и т. Д., Но не смог найти решение.

Я прошел несколько SO-сообщенийоднако большинство из них предлагают использовать 'join', что может не соответствовать моей проблеме!

df1

+----------+-----------+
|  emp_id  |   date1   |
+----------+-----------+
|   67891  | 11-13-2015|
|   12345  | 02-28-2017|
|   34567  | 04-07-2017|
+----------+-----------+

df2

+------------+------------+
|  date2     |   date3    |
+------------+------------+
|01-28-2017  | 03-15-2017 |
|07-13-2017  | 11-13-2017 |
|06-07-2018  | 09-07-2018 |
+------------+------------+

Ожидаемая запись :

+----------+-----------+
|  emp_id  |   date1   |
+----------+-----------+
|   12345  | 02-28-2017|
+----------+-----------+

1 Ответ

1 голос
/ 29 марта 2019

Вы можете делать неравные соединения в искре. Вам не обязательно нужны соответствующие ключи.

Это в Scala, я почти уверен, что в Python почти то же самое. Дай мне знать, если это не сработает. Также обновит ответ в pyspark.

scala> df1.join(df2 , 'date1 > 'date2 && 'date1 < 'date3).show
    +------+----------+----------+----------+
    |emp_id|     date1|     date2|     date3|
    +------+----------+----------+----------+
    | 12345|02-28-2017|01-28-2017|03-15-2017|
    +------+----------+----------+----------+

Pyspark решение:

>>> from pyspark.sql.functions import unix_timestamp
>>> from pyspark.sql.functions import from_unixtime
>>> x = [(67891 ,'11-13-2015'),(12345, '02-28-2017'),(34567,'04-07-2017')]
>>> df1 = spark.createDataFrame(x,['emp_id','date1'])
>>> y = [('01-28-2017','03-15-2017'),('07-13-2017','11-13-2017'),('06-07-2018','09-07-2018')]
>>> df2 = spark.createDataFrame(y,['date2','date3'])
>>> df1a = df1.select('emp_id', from_unixtime(unix_timestamp('date1', 'MM-dd-yyy')).alias('date1'))
>>> df2a = df2.select(from_unixtime(unix_timestamp('date2', 'MM-dd-yyy')).alias('date2'),from_unixtime(unix_timestamp('date3', 'MM-dd-yyy')).alias('date3'))


>>> df1a.join(df2a, on=[df1a['date1'] > df2a['date2'], df1a['date1'] < df2a['date3']]).show()
+------+-------------------+-------------------+-------------------+
|emp_id|              date1|              date2|              date3|
+------+-------------------+-------------------+-------------------+
| 12345|2017-02-28 00:00:00|2017-01-28 00:00:00|2017-03-15 00:00:00|
+------+-------------------+-------------------+-------------------+
...