Beam / Dataflow - большие результаты CoGroupByKey вызывают медленный конвейер - PullRequest
0 голосов
/ 05 декабря 2018

У меня есть 2 PCollection<KV<String, String>>, один имеет размер ~ 150M, а второй ~ 2B.

Что я хочу сделать, это подсчитать количество появлений каждой уникальной пары значений из обеих PCollection.

Так что я сделал CoGroupByKey на этих 2 PCollections, проблема в том, что некоторые (~ 5M) из CoGbkResult очень большие (я получаю сообщения журнала в Dataflow, говорящие о том, что CoGbkResult имеет большечем 10K), поскольку в обеих коллекциях каждый ключ может появляться много раз, и это приводит к очень длительному времени выполнения у рабочих, которые получают эти ключи.

В идеале я бы хотел, чтобы CoGroupByKey вернул PCollection, чтосодержит все пары значений из обоих PCollection, сгруппированных по ключу, поэтому я не могу подсчитать их таким образом, чтобы распараллеливание было лучше.

Я читал об этой проблеме, но, похоже, нет решенияэто подходит мне (большинство из которых включают использование Combine.WithHotKeyFanout), так как мне требуется дополнительный шаг отображения перед объединением, который занимает вечность из-за размера CoGbkResult.Любые предложения, как это исправить?

1 Ответ

0 голосов
/ 13 декабря 2018

Можете ли вы переформатировать ваши данные, чтобы вы могли заменить CoGroupByKey на CombinePerKey?

CoGroupByKey & GroupByKey строят списки всех совпадений, которые могут быть полученыбольшой, но вы заботитесь только о графе, верно?Таким образом, вы можете использовать CombinePerKey с CombineFn, который считает их по мере их поступления

Переформатировать ваши PCollections примерно так:

pcoll_a = [('abc','123'), ('abc', '456'), ...]
pcoll_b = [('abc','123'), ('xyz', '456'), ...]

В что-то вроде этого:

pcoll_a = [('abc,123', 'A'), ('abc,456', 'A'), ...]
pcoll_b = [('abc,123', 'B'), ('xyz,456', 'B'), ...]

Сведите эти 2 PCollections вместе:

pcoll_combined = [('abc,123', 'A'), ('abc,456', 'A'), ('abc,123', 'B'), ('xyz,456', 'B'), ...]

Передайте это в CombinePerKey с CombineFn, который суммирует счет по мере вашего продвижения.Как то так:

class CountFn(apache_beam.core.CombineFn):
    def _add_inputs(self, elements, accumulator=None):
        accumulator = accumulator or self.create_accumulator()
        for obj in elements:
            if obj == 'A':
                accumulator['sum_A'] += 1
            if obj == 'B':
                accumulator['sum_B'] += 1
        return accumulator

    def create_accumulator(self):
        return {'sum_A': 0, 'sum_B': 0}

    def add_input(self, accumulator, element, *args, **kwargs):
        return self._add_inputs(elements=[element], accumulator=accumulator)

    def add_inputs(self, accumulator, elements, *args, **kwargs):
        return self._add_inputs(elements=elements, accumulator=accumulator)

    def merge_accumulators(self, accumulators, *args, **kwargs):
        return {
            'sum_A': sum([i['sum_A'] for i in accumulators]),
            'sum_B': sum([i['sum_B'] for i in accumulators])}

    def extract_output(self, accumulator, *args, **kwargs):
        return accumulator
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...