сумма столбца данных pyspark, содержащего словари - PullRequest
0 голосов
/ 05 июля 2018

У меня есть кадр данных, содержащий только один столбец с элементами типа MapType(StringType(), IntegerType()). Я хотел бы получить кумулятивную сумму для этого столбца, где операция sum будет означать добавление двух словарей.

Минимальный пример

a = [{'Maps': ({'a': 1, 'b': 2, 'c': 3})}, {'Maps': ({'a': 2, 'b': 4, 'd': 6})}]
df = spark.createDataFrame(a)
df.show(5, False)

+---------------------------+
|Maps                       |
+---------------------------+
|Map(a -> 1, b -> 2, c -> 3)|
|Map(a -> 2, b -> 4, d -> 6)|
+---------------------------+

Если бы я получил суммарную сумму столбца Maps, я должен получить следующий результат.

+-----------------------------------+
|Maps                               |
+-----------------------------------+
|Map(a -> 3, b -> 6, c -> 3, d -> 6)|
+-----------------------------------+

P. S. Я использую Python 2.6, поэтому collections.Counter недоступен. Я, вероятно, могу установить его, если это абсолютно необходимо.

Мои попытки:

Я попробовал подход, основанный на accumulator, и подход, использующий fold.

Аккумулятор

def addDictFun(x):
    global v
    v += x

class DictAccumulatorParam(AccumulatorParam):
    def zero(self, d):
        return d
    def addInPlace(self, d1, d2):
        for k in d1:
            d1[k] = d1[k] + (d2[k] if k in d2 else 0)
        for k in d2:
            if k not in d1:
                d1[k] = d2[k]
        return d1

v = sc.accumulator(MapType(StringType(), IntegerType()), DictAccumulatorParam())
cumsum_dict = df.rdd.foreach(addDictFun)

Теперь в конце у меня должен быть полученный словарь в v. Вместо этого я получаю ошибку MapType не повторяемую (в основном это строка for k in d1 в функции addInPlace).

rdd.fold

Подход, основанный на rdd.fold, выглядит следующим образом:

def add_dicts(d1, d2):
    for k in d1:
        d1[k] = d1[k] + (d2[k] if k in d2 else 0)
    for k in d2:
        if k not in d1:
            d1[k] = d2[k]
    return d1

cumsum_dict = df.rdd.fold(MapType(StringType(), IntegerType()), add_dicts)

Однако здесь я получаю ту же ошибку MapType is not iterable. Любая идея, где я иду не так?

Ответы [ 2 ]

0 голосов
/ 05 июля 2018

@ user8371915 ответ с использованием explode является более общим, но вот другой подход, который может быть быстрее, если вы знали ключи заранее:

import pyspark.sql.functions as f
myKeys = ['a', 'b', 'c', 'd']
df.select(*[f.sum(f.col('Maps').getItem(k)).alias(k) for k in myKeys]).show()
#+---+---+---+---+
#|  a|  b|  c|  d|
#+---+---+---+---+
#|  3|  6|  3|  6|
#+---+---+---+---+

И если вы хотите получить результат в MapType(), вы можете использовать pyspark.sql.functions.create_map как:

from itertools import chain
df.select(
    f.create_map(
        list(
            chain.from_iterable(
                [[f.lit(k), f.sum(f.col('Maps').getItem(k))] for k in myKeys]
            )
        )
    ).alias("Maps")
).show(truncate=False)
#+-----------------------------------+
#|Maps                               |
#+-----------------------------------+
#|Map(a -> 3, b -> 6, c -> 3, d -> 6)|
#+-----------------------------------+
0 голосов
/ 05 июля 2018

pyspark.sql.types являются дескрипторами схемы, а не коллекциями или представлениями внешнего языка, поэтому не могут использоваться с fold или Accumulator.

Самое простое решение - explode и совокупность

from pyspark.sql.functions import explode

df = spark.createDataFrame(
    [{'a': 1, 'b': 2, 'c': 3}, {'a': 2, 'b': 4, 'd': 6}], 
    "map<string,integer>"
).toDF("Maps")

df.select(explode("Maps")).groupBy("key").sum("value").rdd.collectAsMap()
# {'d': 6, 'c': 3, 'b': 6, 'a': 3}  

С RDD вы можете сделать похожую вещь:

from operator import add

df.rdd.flatMap(lambda row: row.Maps.items()).reduceByKey(add).collectAsMap()
# {'b': 6, 'c': 3, 'a': 3, 'd': 6}

или если вы действительно хотите fold

from operator import attrgetter
from collections import defaultdict

def merge(acc, d):
    for k in d:
        acc[k] += d[k]
    return acc

df.rdd.map(attrgetter("Maps")).fold(defaultdict(int), merge)
# defaultdict(int, {'a': 3, 'b': 6, 'c': 3, 'd': 6})
...