Как объединить динамически именованные столбцы в словарь? - PullRequest
3 голосов
/ 08 марта 2019

Учитывая эти кадры данных:

IncomingCount
-------------------------
Venue|Date    | 08 | 10 |
-------------------------
Hotel|20190101| 15 | 03 |
Beach|20190101| 93 | 45 |

OutgoingCount
-------------------------
Venue|Date    | 07 | 10 | 
-------------------------
Beach|20190101| 30 | 5  |
Hotel|20190103| 05 | 15 |

Как я могу объединить (полное объединение) две таблицы, что приведет к чему-то следующему без необходимости вручную циклически проходить по каждой строке обеих таблиц?

Dictionary:
[
 {"Venue":"Hotel", "Date":"20190101", "08":{ "IncomingCount":15 }, "10":{ "IncomingCount":03 } },
 {"Venue":"Beach", "Date":"20190101", "07":{ "OutgoingCount":30 }, "08":{ "IncomingCount":93 }, "10":{ "IncomingCount":45, "OutgoingCount":15 } },
 {"Venue":"Hotel", "Date":"20190103", "07":{ "OutgoingCount":05 }, "10":{ "OutgoingCount":15 } }
]

Условия:

  1. Столбцы Место и Дата действуют как условия соединения.
  2. Другие столбцы, представленные в числах, создаются динамически.
  3. Если динамически столбец не существует, он исключается (или включается со значением None в качестве значения).

Ответы [ 3 ]

1 голос
/ 08 марта 2019

это довольно неудобно, но это можно сделать, используя функцию create_map от spark.

в основном делит столбцы на четыре группы: ключи (место проведения, дата), общие (10),только входящие (08), только исходящие (07).

, затем создают сопоставители для группы (кроме ключей), сопоставляя только то, что доступно для группы.Примените сопоставление, удалите старый столбец и переименуйте сопоставленный столбец в старое имя.

наконец преобразуйте все строки в dict (из rdd df) и соберите.

from pyspark.sql import SparkSession
from pyspark.sql.functions import create_map, col, lit

spark = SparkSession.builder.appName('hotels_and_beaches').getOrCreate()

incoming_counts = spark.createDataFrame([('Hotel', 20190101, 15, 3), ('Beach', 20190101, 93, 45)], ['Venue', 'Date', '08', '10']).alias('inc')
outgoing_counts = spark.createDataFrame([('Beach', 20190101, 30, 5), ('Hotel', 20190103, 5, 15)], ['Venue', 'Date', '07', '10']).alias('out')

df = incoming_counts.join(outgoing_counts, on=['Venue', 'Date'], how='full')

outgoing_cols = {c for c in outgoing_counts.columns if c not in {'Venue', 'Date'}}
incoming_cols = {c for c in incoming_counts.columns if c not in {'Venue', 'Date'}}

common_cols = outgoing_cols.intersection(incoming_cols)

outgoing_cols = outgoing_cols.difference(common_cols)
incoming_cols = incoming_cols.difference(common_cols)

for c in common_cols:
    df = df.withColumn(
        c + '_new', create_map(
            lit('IncomingCount'), col('inc.{}'.format(c)),
            lit('OutgoingCount'), col('out.{}'.format(c)),
        )
    ).drop(c).withColumnRenamed(c + '_new', c)

for c in incoming_cols:
    df = df.withColumn(
        c + '_new', create_map(
            lit('IncomingCount'), col('inc.{}'.format(c)),
        )
    ).drop(c).withColumnRenamed(c + '_new', c)

for c in outgoing_cols:
    df = df.withColumn(
        c + '_new', create_map(
            lit('OutgoingCount'), col('out.{}'.format(c)),
        )
    ).drop(c).withColumnRenamed(c + '_new', c)

result = df.coalesce(1).rdd.map(lambda r: r.asDict()).collect()
print(result)

результат:

[{'Venue': 'Hotel', 'Date': 20190101, '10': {'OutgoingCount': None, 'IncomingCount': 3}, '08': {'IncomingCount': 15}, '07': {'OutgoingCount': None}}, {'Venue': 'Hotel', 'Date': 20190103, '10': {'OutgoingCount': 15, 'IncomingCount': None}, '08': {'IncomingCount': None}, '07': {'OutgoingCount': 5}}, {'Venue': 'Beach', 'Date': 20190101, '10': {'OutgoingCount': 5, 'IncomingCount': 45}, '08': {'IncomingCount': 93}, '07': {'OutgoingCount': 30}}]
1 голос
/ 08 марта 2019

Я могу получить это до сих пор:

import pandas as pd
import numpy as np

dd1 = {'venue': ['hotel', 'beach'], 'date':['20190101', '20190101'], '08': [15, 93], '10':[3, 45]}
dd2 = {'venue': ['beach', 'hotel'], 'date':['20190101', '20190103'], '07': [30, 5], '10':[5, 15]}

df1 = pd.DataFrame(data=dd1)
df2 = pd.DataFrame(data=dd2)

df1.columns = [f"IncomingCount:{x}" if x not in ['venue', 'date'] else x for x in df1.columns]
df2.columns = [f"OutgoingCount:{x}" if x not in ['venue', 'date'] else x for x in df2.columns ]

ll_dd = pd.merge(df1, df2, on=['venue', 'date'], how='outer').to_dict('records')
ll_dd = [{k:v for k,v in dd.items() if not pd.isnull(v)} for dd in ll_dd]

ВЫХОД:

[{'venue': 'hotel',
  'date': '20190101',
  'IncomingCount:08': 15.0,
  'IncomingCount:10': 3.0},
 {'venue': 'beach',
  'date': '20190101',
  'IncomingCount:08': 93.0,
  'IncomingCount:10': 45.0,
  'OutgoingCount:07': 30.0,
  'OutgoingCount:10': 5.0},
 {'venue': 'hotel',
  'date': '20190103',
  'OutgoingCount:07': 5.0,
  'OutgoingCount:10': 15.0}]
0 голосов
/ 09 марта 2019

Окончательный результат по желанию OP - это list из dictionaries, где все строки в кадре данных, которые имеют одинаковые Venue и Date, были объединены.

# Creating the DataFrames
df_Incoming = sqlContext.createDataFrame([('Hotel','20190101',15,3),('Beach','20190101',93,45)],('Venue','Date','08','10'))
df_Incoming.show()
+-----+--------+---+---+
|Venue|    Date| 08| 10|
+-----+--------+---+---+
|Hotel|20190101| 15|  3|
|Beach|20190101| 93| 45|
+-----+--------+---+---+
df_Outgoing = sqlContext.createDataFrame([('Beach','20190101',30,5),('Hotel','20190103',5,15)],('Venue','Date','07','10'))
df_Outgoing.show()
+-----+--------+---+---+
|Venue|    Date| 07| 10|
+-----+--------+---+---+
|Beach|20190101| 30|  5|
|Hotel|20190103|  5| 15|
+-----+--------+---+---+

Идея состоит в том, чтобы создать dictionary из каждого row и сохранить все rows из DataFrame в виде словарей в одном большом list. И в качестве последнего шага мы объединяем вместе те словари, которые имеют одинаковые Venue и Date.

Поскольку все rows в DataFrame хранятся как Row() объекты, мы используем функцию collect () , чтобы вернуть все записи как list из Row(). Просто чтобы проиллюстрировать вывод -

print(df_Incoming.collect())
[Row(Venue='Hotel', Date='20190101', 08=15, 10=3), Row(Venue='Beach', Date='20190101', 08=93, 10=45)]

Но, поскольку мы хотим list из dictionaries, мы можем использовать list comprehensions для преобразования их в единицу -

list_Incoming = [row.asDict() for row in df_Incoming.collect()]
print(list_Incoming)
[{'10': 3, 'Date': '20190101', 'Venue': 'Hotel', '08': 15}, {'10': 45, 'Date': '20190101', 'Venue': 'Beach', '08': 93}]

Но, поскольку числовые столбцы были в форме, подобной "08":{ "IncomingCount":15 }, вместо "08":15, поэтому мы используем dictionary comprehensions для преобразования их в эту форму -

list_Incoming = [ {k:v if k in ['Venue','Date'] else {'IncomingCount':v} for k,v in dict_element.items()} for dict_element in list_Incoming]
print(list_Incoming)
[{'10': {'IncomingCount': 3}, 'Date': '20190101', 'Venue': 'Hotel', '08': {'IncomingCount': 15}}, {'10': {'IncomingCount': 45}, 'Date': '20190101', 'Venue': 'Beach', '08': {'IncomingCount': 93}}]

Аналогично, мы делаем для OutgoingCount

list_Outgoing = [row.asDict() for row in df_Outgoing.collect()]
list_Outgoing = [ {k:v if k in ['Venue','Date'] else {'OutgoingCount':v} for k,v in dict_element.items()} for dict_element in list_Outgoing]
print(list_Outgoing)
[{'10': {'OutgoingCount': 5}, 'Date': '20190101', 'Venue': 'Beach', '07': {'OutgoingCount': 30}}, {'10': {'OutgoingCount': 15}, 'Date': '20190103', 'Venue': 'Hotel', '07': {'OutgoingCount': 5}}]

Последний шаг: Теперь, когда мы создали реквизит list из dictionaries, нам нужно объединить список на основе Venue и Date.

from copy import deepcopy
def merge_lists(list_Incoming, list_Outgoing):
    # create dictionary from list_Incoming:
    dict1 = {(record['Venue'], record['Date']): record  for record in list_Incoming}

    #compare elements in list_Outgoing to those on list_Incoming:

    result = {}
    for record in list_Outgoing:
        ckey = record['Venue'], record['Date']
        new_record = deepcopy(record)
        if ckey in dict1:
            for key, value in dict1[ckey].items():
                if key in ('Venue', 'Date'):
                    # Do not merge these keys
                    continue
                # Dict's "setdefault" finds a key/value, and if it is missing
                # creates a new one with the second parameter as value
                new_record.setdefault(key, {}).update(value)

        result[ckey] = new_record

    # Add values from list_Incoming that were not matched in list_Outgoing:
    for key, value in dict1.items():
        if key not in result:
            result[key] = deepcopy(value)

    return list(result.values())

res = merge_lists(list_Incoming, list_Outgoing)
print(res)
[{'10': {'OutgoingCount': 5, 'IncomingCount': 45}, 
  'Date': '20190101', 
  'Venue': 'Beach', 
  '08': {'IncomingCount': 93}, 
  '07': {'OutgoingCount': 30}
 },

 {'10': {'OutgoingCount': 15}, 
   'Date': '20190103', 
   'Venue': 'Hotel', 
   '07': {'OutgoingCount': 5}
 }, 

 {'10': {'IncomingCount': 3}, 
  'Date': '20190101', 
  'Venue': 'Hotel', 
  '08': {'IncomingCount': 15}
 }]
...