Этот код должен работать независимо от того, сколько раз package_scan_code=03
встречается для каждого package_id
в кадре данных.Я добавил еще один (123,'LosAngeles','03')
, чтобы продемонстрировать, что -
Шаг 1: Создание кадра данных
values = [(123,'Denver','05'),(123,'LosAngeles','03'),(123,'Dallas','09'),(123,'Vail','02'),(123,'LosAngeles','03'),
(456,'Jacksonville','05'),(456,'Nashville','09'),(456,'Memphis','03')]
df = sqlContext.createDataFrame(values,['package_id','location','package_scan_code'])
Шаг 2: Создание словаряиз package_id
и location
.
df_count = df.where(col('package_scan_code')=='03').groupby('package_id','location').count()
dict_location_scan_code = dict(df_count.rdd.map(lambda x: (x['package_id'], x['location'])).collect())
print(dict_location_scan_code)
{456: 'Memphis', 123: 'LosAngeles'}
Шаг 3: Создание столбца, отображение словаря.
from pyspark.sql.functions import col, create_map, lit
from itertools import chain
mapping_expr = create_map([lit(x) for x in chain(*dict_location_scan_code.items())])
df = df.withColumn('origin', mapping_expr.getItem(col('package_id')))
df.show()
+----------+------------+-----------------+----------+
|package_id| location|package_scan_code| origin|
+----------+------------+-----------------+----------+
| 123| Denver| 05|LosAngeles|
| 123| LosAngeles| 03|LosAngeles|
| 123| Dallas| 09|LosAngeles|
| 123| Vail| 02|LosAngeles|
| 123| LosAngeles| 03|LosAngeles|
| 456|Jacksonville| 05| Memphis|
| 456| Nashville| 09| Memphis|
| 456| Memphis| 03| Memphis|
+----------+------------+-----------------+----------+