Можете ли вы переформатировать ваши данные, чтобы вы могли заменить CoGroupByKey
на CombinePerKey
?
CoGroupByKey
& GroupByKey
строят списки всех совпадений, которые могут быть полученыбольшой, но вы заботитесь только о графе, верно?Таким образом, вы можете использовать CombinePerKey
с CombineFn
, который считает их по мере их поступления
Переформатировать ваши PCollections примерно так:
pcoll_a = [('abc','123'), ('abc', '456'), ...]
pcoll_b = [('abc','123'), ('xyz', '456'), ...]
В что-то вроде этого:
pcoll_a = [('abc,123', 'A'), ('abc,456', 'A'), ...]
pcoll_b = [('abc,123', 'B'), ('xyz,456', 'B'), ...]
Сведите эти 2 PCollections вместе:
pcoll_combined = [('abc,123', 'A'), ('abc,456', 'A'), ('abc,123', 'B'), ('xyz,456', 'B'), ...]
Передайте это в CombinePerKey
с CombineFn
, который суммирует счет по мере вашего продвижения.Как то так:
class CountFn(apache_beam.core.CombineFn):
def _add_inputs(self, elements, accumulator=None):
accumulator = accumulator or self.create_accumulator()
for obj in elements:
if obj == 'A':
accumulator['sum_A'] += 1
if obj == 'B':
accumulator['sum_B'] += 1
return accumulator
def create_accumulator(self):
return {'sum_A': 0, 'sum_B': 0}
def add_input(self, accumulator, element, *args, **kwargs):
return self._add_inputs(elements=[element], accumulator=accumulator)
def add_inputs(self, accumulator, elements, *args, **kwargs):
return self._add_inputs(elements=elements, accumulator=accumulator)
def merge_accumulators(self, accumulators, *args, **kwargs):
return {
'sum_A': sum([i['sum_A'] for i in accumulators]),
'sum_B': sum([i['sum_B'] for i in accumulators])}
def extract_output(self, accumulator, *args, **kwargs):
return accumulator