Вам нужно присоединиться к поездке в оба конца с одиночными поездками дважды.Ваши ключи присоединения - Market
, date
и time
.Рынки туда и обратно должны быть разделены на 6-символьные коды, чтобы соответствовать рынкам одиночных поездок:
Сначала давайте разделим информационный кадр на отдельные поездки и туда и обратно:
import pyspark.sql.functions as psf
single, roundtrip = [df.filter(psf.col('ttype') == i).drop('ttype') for i in [1, 2]]
Чтобы извлечь исходящие и входящие рынки длятуда и обратно, мы просто используем substring
:
roundtrip = roundtrip \
.withColumn('outMarket', psf.substring('Market', 0, 6)) \
.withColumn('inMarket', psf.substring('Market', 7, 6))
Теперь мы можем присоединиться дважды (для исходящих и входящих):
single = single \
.drop('intime') \
.withColumnRenamed('outtime', 'time') \
.withColumnRenamed('Price', 'bound')
single.persist()
for bound in ['out', 'in']:
roundtrip = roundtrip \
.join(
single.select([psf.col(c).alias(bound + c) for c in single.columns if c != 'date'] + ['date']),
on=[bound + c for c in ['Market', 'time']] + ['date'], how='left')
roundtrip.show()
+--------+------+--------+---------+-------+------------+-----+--------+-------+
|inMarket|intime| date|outMarket|outtime| Market|Price|outbound|inbound|
+--------+------+--------+---------+-------+------------+-----+--------+-------+
| JFKATL| 0600|20190403| ATLJFK| 0215|ATLJFKJFKATL| 150| 77| 88|
+--------+------+--------+---------+-------+------------+-----+--------+-------+