У меня есть отсортированный фрейм данных PYSPARK («временная метка» и «корабль» возрастают):
+----------------------+------+
| timestamp | ship |
+----------------------+------+
| 2018-08-01 06:01:00 | 1 |
| 2018-08-01 06:01:30 | 1 |
| 2018-08-01 09:00:00 | 1 |
| 2018-08-01 09:00:00 | 2 |
| 2018-08-01 10:15:43 | 2 |
| 2018-08-01 11:00:01 | 3 |
| 2018-08-01 06:00:13 | 4 |
| 2018-08-01 13:00:00 | 4 |
| 2018-08-13 14:00:00 | 5 |
| 2018-08-13 14:15:03 | 5 |
| 2018-08-13 14:45:08 | 5 |
| 2018-08-13 14:50:00 | 5 |
+-----------------------------+
Я хочу добавить новый столбец в фрейм данных под названием «поездка». Поездка определяется как номер корабля, который отправляется в течение 2 часов с момента начала записи судна в фрейм данных. Если в течение двух часов номер корабля изменится, новый номер рейса должен быть добавлен в столбец «поездка» фрейма данных.
Желаемый результат выглядит так:
+----------------------+------+-------+
| timestamp | ship | trip |
+----------------------+------+-------+
| 2018-08-01 06:01:00 | 1 | 1 | # start new ship number
| 2018-08-01 06:01:30 | 1 | 1 | # still within 2 hours of same ship number
| 2018-08-01 09:00:00 | 1 | 2 | # more than 2 hours of same ship number = new trip
| 2018-08-01 09:00:00 | 2 | 3 | # new ship number = new trip
| 2018-08-01 10:15:43 | 2 | 3 | # still within 2 hours of same ship number
| 2018-08-01 11:00:01 | 3 | 4 | # new ship number = new trip
| 2018-08-01 06:00:13 | 4 | 5 | # new ship number = new trip
| 2018-08-01 13:00:00 | 4 | 6 | # more than 2 hours of same ship number = new trip
| 2018-08-13 14:00:00 | 5 | 7 | # new ship number = new trip
| 2018-08-13 14:15:03 | 5 | 7 | # still within 2 hours of same ship number
| 2018-08-13 14:45:08 | 5 | 7 | # still within 2 hours of same ship number
| 2018-08-13 14:50:00 | 5 | 7 | # still within 2 hours of same ship number
+-----------------------------+-------+
В Pandas это будет сделано как таковое:
dt_trip = 2 # time duration trip per ship (in hours)
total_time = df['timestamp'] - df.groupby('name')['timestamp'].transform('min')
trips = total_time.dt.total_seconds().fillna(0)//(dt_trip*3600)
df['trip'] = df.groupby(['name', trips]).ngroup()+1
Как это будет сделано в PYSPARK?