У меня есть кадр данных, содержащий только один столбец с элементами типа MapType(StringType(), IntegerType())
. Я хотел бы получить кумулятивную сумму для этого столбца, где операция sum
будет означать добавление двух словарей.
Минимальный пример
a = [{'Maps': ({'a': 1, 'b': 2, 'c': 3})}, {'Maps': ({'a': 2, 'b': 4, 'd': 6})}]
df = spark.createDataFrame(a)
df.show(5, False)
+---------------------------+
|Maps |
+---------------------------+
|Map(a -> 1, b -> 2, c -> 3)|
|Map(a -> 2, b -> 4, d -> 6)|
+---------------------------+
Если бы я получил суммарную сумму столбца Maps
, я должен получить следующий результат.
+-----------------------------------+
|Maps |
+-----------------------------------+
|Map(a -> 3, b -> 6, c -> 3, d -> 6)|
+-----------------------------------+
P. S. Я использую Python 2.6, поэтому collections.Counter
недоступен. Я, вероятно, могу установить его, если это абсолютно необходимо.
Мои попытки:
Я попробовал подход, основанный на accumulator
, и подход, использующий fold
.
Аккумулятор
def addDictFun(x):
global v
v += x
class DictAccumulatorParam(AccumulatorParam):
def zero(self, d):
return d
def addInPlace(self, d1, d2):
for k in d1:
d1[k] = d1[k] + (d2[k] if k in d2 else 0)
for k in d2:
if k not in d1:
d1[k] = d2[k]
return d1
v = sc.accumulator(MapType(StringType(), IntegerType()), DictAccumulatorParam())
cumsum_dict = df.rdd.foreach(addDictFun)
Теперь в конце у меня должен быть полученный словарь в v
. Вместо этого я получаю ошибку MapType
не повторяемую (в основном это строка for k in d1
в функции addInPlace
).
rdd.fold
Подход, основанный на rdd.fold
, выглядит следующим образом:
def add_dicts(d1, d2):
for k in d1:
d1[k] = d1[k] + (d2[k] if k in d2 else 0)
for k in d2:
if k not in d1:
d1[k] = d2[k]
return d1
cumsum_dict = df.rdd.fold(MapType(StringType(), IntegerType()), add_dicts)
Однако здесь я получаю ту же ошибку MapType is not iterable
. Любая идея, где я иду не так?