Фильтруйте и суммируйте один фрейм данных Pyspark, используя информацию строк из другого фрейма данных Pyspark - PullRequest
0 голосов
/ 10 апреля 2020

У меня есть два разных фрейма данных с именами df и cr, каждый с разными столбцами и содержимым строки, как показано ниже. Я пытаюсь найти количество рейсов на заданном маршруте для данного типа флота между датами установки и удаления, указанными в кадре данных cr, и создать новый столбец с этой суммой. Фрейм данных df содержит информацию о парке, дате, маршруте и подсчете. Я предполагаю необходимость отфильтровывать df от информации, содержащейся в каждом ряду cr. Фильтрация будет осуществляться по флоту, маршруту и ​​диапазону дат. После того, как df является фильтром, счетчик маршрута суммируется и помещается в новый столбец для данной строки в cr, а затем переходит к следующей строке. Обычно я делаю это в python, используя для l oop, но мои кадры данных довольно большие, и в Pyspark зацикливание утомительно.

Моя текущая попытка:

def component_route_normalized(component_route_count, fleet_month_year_count):
    cr=component_route_count
    df=fleet_month_year_count

    temp=df.filter(
         F.col('month-year').between(pd.to_datetime(cr.date_installed),pd.to_datetime(cr.date_removed)) &
         F.col('route') == cr.route &
         F.col('fleet_type') == cr.fmis_fleet_type_code
    )

    cr=cr.withColumn('fleet_route_count', F.sum(temp.route_count))

return cr

пример содержимого файла данных cr:

+-----------+----------------------+------------------------+------------+-------+-------------+---------+-------+--------------------+--------------+------------+
|aircraft_id|nca_part_number_sse001|nca_serial_number_sse001|repair_cycle|  route|part__routing|departure|arrival|fmis_fleet_type_code|date_installed|date_removed|
+-----------+----------------------+------------------------+------------+-------+-------------+---------+-------+--------------------+--------------+------------+
|          1|        25-3246-9-0001|                     341|           8|PVG-EWR|           13|      PVG|    EWR|                 777|    2014-12-16|  2015-12-10|
|          1|        25-3246-9-0001|                     341|           8|EWR-TLV|           34|      EWR|    TLV|                 777|    2014-12-16|  2015-12-10|
|          1|        25-3246-9-0001|                     341|           8|CDG-EWR|            4|      CDG|    EWR|                 777|    2014-12-16|  2015-12-10|
|          1|        25-3246-9-0001|                     341|           8|DEL-EWR|           16|      DEL|    EWR|                 777|    2014-12-16|  2015-12-10|
|          1|        25-3246-9-0001|                     341|           8|EWR-MXP|            3|      EWR|    MXP|                 777|    2014-12-16|  2015-12-10|
|          1|        25-3246-9-0001|                     341|           8|EWR-LHR|            7|      EWR|    LHR|                 777|    2014-12-16|  2015-12-10|
|          1|        25-3246-9-0001|                     341|           8|TLV-EWR|           34|      TLV|    EWR|                 777|    2014-12-16|  2015-12-10|
|          1|        25-3246-9-0001|                     341|           8|NRT-IAH|           15|      NRT|    IAH|                 777|    2014-12-16|  2015-12-10|
|          1|        25-3246-9-0001|                     341|           8|IAH-FRA|            5|      IAH|    FRA|                 777|    2014-12-16|  2015-12-10|
|          1|        25-3246-9-0001|                     341|           8|EWR-CDG|            4|      EWR|    CDG|                 777|    2014-12-16|  2015-12-10|
|          1|        25-3246-9-0001|                     341|           8|BRU-EWR|            8|      BRU|    EWR|                 777|    2014-12-16|  2015-12-10|
|          1|        25-3246-9-0001|                     341|           8|NRT-EWR|           17|      NRT|    EWR|                 777|    2014-12-16|  2015-12-10|
|          1|        25-3246-9-0001|                     341|           8|FRA-EWR|           11|      FRA|    EWR|                 777|    2014-12-16|  2015-12-10|
|          1|        25-3246-9-0001|                     341|           8|EWR-PVG|           14|      EWR|    PVG|                 777|    2014-12-16|  2015-12-10|
+-----------+----------------------+------------------------+------------+-------+-------------+---------+-------+--------------------+--------------+------------+ 

пример содержимого файла данных df:

+----------+-------+----------+-----------+------------+-----------+
|month-year|  route|fleet_type|route_count|flight_month|flight_year|
+----------+-------+----------+-----------+------------+-----------+
|  6/1/2014|PHL-ORD|       737|         92|           6|       2014|
|  4/1/2014|IAH-TUL|       787|         23|           4|       2014|
|  4/1/2014|DFW-ORD|       737|         86|           4|       2014|
|  5/1/2014|BRO-IAH|       737|         33|           5|       2014|
|  4/1/2014|YQR-ORD|       787|          9|           4|       2014|
|  3/1/2014|SFO-IAH|       757|         58|           3|       2014|
|  4/1/2014|AUS-IAH|       BUS|         55|           4|       2014|
|  5/1/2014|AGU-IAH|       787|          1|           5|       2014|
+----------+-------+----------+-----------+------------+-----------+

1 Ответ

0 голосов
/ 10 апреля 2020

A join для типа флота, диапазона дат и маршрута, за которым следуют groupBy и count. Вы можете попробовать что-то вроде этого (вероятно, понадобятся настройки для работы с типами столбцов):

cond =  (F.col('cr.month-year').between(F.col('df.date_installed',F.col('df.date_removed')) &
         (F.col('df.route') == F.col('cr.route')) &
         (F.col('df.fleet_type') == F.col('cr.fmis_fleet_type_code')))

joined = cr.alias('cr').join(df.alias('df'), on=cond, how='inner')
counts = joined.groupBy(*cr.columns).agg(F.sum('route_count').alias('total_route_count'))
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...