Объединение Python словарей в фрейм данных Spark, когда словари имеют разные ключи - PullRequest
0 голосов
/ 27 февраля 2020

Если у меня есть список словарей, который выглядит примерно так:

list = [{'a': 1, 'b': 2, 'c': 3}, {'b': 4, 'c': 5, 'd': 6, 'e': 7}]

Как я могу преобразовать список в фрейм данных Spark, не удаляя ключи, которые не могут быть общими для словарей? Например, если я использую s c .parallelize (list) .toDF (), результирующий фрейм данных будет иметь столбцы 'a', 'b' и 'c', а столбец 'a' будет нулевым для второго словарь, и столбцы 'd' и 'e' из второго словаря будут полностью отброшены.

Из-за того, что я играю с порядком словарей, я вижу, что он относится к ключам в словаре, который появляется первым в списке, поэтому, если бы я поменял местами словари в моем примере выше, мой результирующий кадр данных будет иметь столбцы 'b', 'c', 'd' и 'e'.

В действительности в этом списке будет гораздо больше двух словарей, и не будет никакой гарантии, что ключи будут одинаковыми от словаря к словарю, поэтому важно найти надежный способ обработки потенциально разных ключей.

1 Ответ

1 голос
/ 27 февраля 2020

Вы можете передать словарь в функцию createDataFrame.

l = [{'a': 1, 'b': 2, 'c': 3}, {'b': 4, 'c': 5, 'd': 6, 'e': 7}]
df = spark.createDataFrame(l)
#UserWarning: inferring schema from dict is deprecated,please use pyspark.sql.Row instead
#warnings.warn("inferring schema from dict is deprecated
df.show()

+----+---+---+----+----+
|   a|  b|  c|   d|   e|
+----+---+---+----+----+
|   1|  2|  3|null|null|
|null|  4|  5|   6|   7|
+----+---+---+----+----+

Также укажите schema для столбцов, поскольку вывод схемы для словарей устарел. Использование Row объектов для создания фрейма данных требует, чтобы все словари имели одинаковые столбцы.

Программное определение схемы путем слияния ключей из всех используемых словарей.

from pyspark.sql.types import StructType,StructField,IntegerType

#Function to merge keys from several dicts
def merge_keys(*dict_args):
    result = set()
    for dict_arg in dict_args:
        for key in dict_arg.keys():
            result.add(key)
    return sorted(list(result))

#Generate schema given a column list
def generate_schema(columns):
    result = StructType()
    for column in columns:
        result.add(column,IntegerType(),nullable=True) #change type and nullability as needed
    return result

df = spark.createDataFrame(l,schema=generate_schema(merge_keys(*l)))
...