Я получаю поток сложного и вложенного объекта JSON в качестве входных данных для моего конвейера.
Моя цель - создать небольшие пакеты для передачи в другую тему pubsub
для последующей обработки.Я борюсь с функцией beam.beam.GroupByKey()
- из того, что я прочитал, это правильный метод для объединения.
Упрощенный пример, события ввода:
{ data:['a', 'b', 'c'], url: 'websiteA.com' }
{ data:['a', 'b', 'c'], url: 'websiteA.com' }
{ data:['a'], url: 'websiteB.com' }
Я пытаюсь создать следующее:
{
'websiteA.com': {a:2, b:2, c:2},
'websiteB.com': {a:1},
}
Моя проблема заключается в попытке сгруппировать что-либо ещечто самый простой кортеж выбрасывает ValueError: too many values to unpack
.
Я мог бы выполнить это в два этапа, но из моего чтения использование beam.GroupByKey()
стоит дорого и поэтому должно быть сведено к минимуму.
РЕДАКТИРОВАТЬ на основе ответаот @ Cubez.
Это моя функция объединения, которая, кажется, наполовину работает: (
class MyCustomCombiner(beam.CombineFn):
def create_accumulator(self):
logging.info('accum_created') #Logs OK!
return {}
def add_input(self, counts, input):
counts = {}
for i in input:
counts[i] = 1
logging.info(counts) #Logs OK!
return counts
def merge_accumulators(self, accumulators):
logging.info('accumcalled') #never logs anything
c = collections.Counter()
for d in accumulators:
c.update(d)
logging.info('accum: %s', accumulators) #never logs anything
return dict(c)
def extract_output(self, counts):
logging.info('Counts2: %s', counts) #never logs anything
return counts
Кажется, что в прошлом add_input
ничего не вызывается?
Добавление конвейеракод:
with beam.Pipeline(argv=pipeline_args) as p:
raw_loads_dict = (p
| 'ReadPubsubLoads' >> ReadFromPubSub(topic=PUBSUB_TOPIC_NAME).with_output_types(bytes)
| 'JSONParse' >> beam.Map(lambda x: json.loads(x))
)
fixed_window_events = (raw_loads_dict
| 'KeyOnUrl' >> beam.Map(lambda x: (x['client_id'], x['events']))
| '1MinWindow' >> beam.WindowInto(window.FixedWindows(60))
| 'CustomCombine' >> beam.CombinePerKey(MyCustomCombiner())
)
fixed_window_events | 'LogResults2' >> beam.ParDo(LogResults())