Метод ReduceByKey для агрегирования словарей - PullRequest
0 голосов
/ 16 апреля 2020

У меня есть метод spark, в котором я запускаю функцию flatMap, которая возвращает мне список кортежей. Значение ключа в кортеже - Timestamp, а значение - dict.

[(Timestamp('2000-01-01 00:00:00'),
  {'id': '1', 'val': '200M', 'date':Timestamp('2000-01-01 00:00:00')}),
 (Timestamp('2000-01-01 00:00:00'),
  {'id': '2', 'val': '10M', 'date':Timestamp('2000-01-01 00:00:00')}),
 (Timestamp('2000-01-01 00:00:00'),
  {'id': '3', 'val': '30M', 'date':Timestamp('2000-01-01 00:00:00')}),
 (Timestamp('2000-01-02 00:00:00'),
  {'id': '15', 'val': '120M', 'date':Timestamp('2000-01-02 00:00:00')}),
 (Timestamp('2000-01-02 00:00:00'),
  {'id': '3', 'val': '35M', 'date':Timestamp('2000-01-02 00:00:00')}),
 (Timestamp('2000-01-02 00:00:00'),
  {'id': '4', 'val': '56M', 'date':Timestamp('2000-01-02 00:00:00')}),
 (Timestamp('2000-01-03 00:00:00'),
  {'id': '6', 'val': '5M', 'date':Timestamp('2000-01-03 00:00:00')}),
 (Timestamp('2000-01-03 00:00:00'),
  {'id': '1', 'val': '25M', 'date':Timestamp('2000-01-03 00:00:00')}),
 (Timestamp('2000-01-03 00:00:00'),
  {'id': '2', 'val': '7M', 'date':Timestamp('2000-01-03 00:00:00')}),

Я пытаюсь запустить следующую функцию reduceByKey, которая дает мне:

[ (Timestamp('2000-01-01 00:00:00'),
  [{'id': '1', 'val': '200M', 'date':Timestamp('2000-01-01 00:00:00')},
   {'id': '2', 'val': '10M', 'date':Timestamp('2000-01-01 00:00:00')},
   {'id': '3', 'val': '30M', 'date':Timestamp('2000-01-01 00:00:00')}]),
  (Timestamp('2000-01-02 00:00:00'),
  [{'id': '15', 'val': '120M', 'date':Timestamp('2000-01-02 00:00:00')},
   {'id': '3', 'val': '35M', 'date':Timestamp('2000-01-02 00:00:00')},
   {'id': '4', 'val': '56M', 'date':Timestamp('2000-01-02 00:00:00')}]),
  (Timestamp('2000-01-03 00:00:00'),
  [{'id': '6', 'val': '5M', 'date':Timestamp('2000-01-03 00:00:00')},
   {'id': '1', 'val': '25M', 'date':Timestamp('2000-01-03 00:00:00')},
   {'id': '2', 'val': '7M', 'date':Timestamp('2000-01-03 00:00:00')}]) ]

Пока я пробовал это: output = rdd.flatMap(split_func).reduceByKey(lambda x, y: x+y).collect()

но я получаю эту ошибку: TypeError: unsupported operand type(s) for +: 'dict' and 'dict'

Заранее спасибо!

1 Ответ

0 голосов
/ 16 апреля 2020

Это больше python ошибка. Если d1 и d2 являются словарями, то d1 + d2 не работает. Тем не менее, вы можете сделать {**d1, **d2}. Если d1 и d2 имеют одну и ту же клавишу, она примет значение от d2.

Так что вы можете сделать output = rdd.flatMap(split_func).reduceByKey(lambda x, y: {**x, **y}).collect()

Однако, в результате вы получите список кортежей. Так что в этом случае я думаю, что groupByKey лучше: output = rdd.flatMap(split_func).groupByKey().mapValues(list).collect()

...