почему функция комбинирования вызывается трижды? - PullRequest
1 голос
/ 03 августа 2020

Я пытаюсь понять комбайн трансформатора в конвейере пучка apache.

Учитывая следующий пример конвейера:

def test_combine(data):
    logging.info('test combine')
    logging.info(type(data))
    logging.info(data)
    return [1, 2, 3]


def run():
    logging.info('start pipeline')
    pipeline_options = PipelineOptions(
        None, streaming=True, save_main_session=True,
    )
    p = beam.Pipeline(options=pipeline_options)

    data = p | beam.Create([
        {'id': '1', 'ts': datetime.datetime.utcnow()},
        {'id': '2', 'ts': datetime.datetime.utcnow()},
        {'id': '3', 'ts': datetime.datetime.utcnow()}
    ])

    purchase_paths = (
        data
        | WindowInto(FixedWindows(10))
        | beam.CombineGlobally(test_combine).without_defaults()
    )

    result = p.run()
    result.wait_until_finish()
    logging.info('end pipeline')


if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run()

Создает следующий вывод журнала:

INFO:root:test combine
INFO:root:<class 'list'>
INFO:root:[{'id': '1', 'ts': datetime.datetime(2020, 8, 3, 19, 22, 53, 193363)}, {'id': '2', 'ts': datetime.datetime(2020, 8, 3, 19, 22, 53, 193366)}, {'id': '3', 'ts': datetime.datetime(2020, 8, 3, 19, 22, 53, 193367)}]
INFO:root:test combine
INFO:root:<class 'apache_beam.transforms.core._ReiterableChain'>
INFO:root:<apache_beam.transforms.core._ReiterableChain object at 0x1210faf50>
INFO:root:test combine
INFO:root:<class 'list'>
INFO:root:[[1, 2, 3]]
INFO:root:end pipeline

Почему функция комбинирования вызывается трижды и каждый раз получает разные входные данные? В последнем вызове он, кажется, получает собственное возвращаемое значение в качестве входных.

Обновление

Я неправильно понял из объединителя. В документации сказано:

Функция комбинирования должна быть коммутативной и ассоциативной, поскольку функция не обязательно вызывается ровно один раз для всех значений с заданным ключом

Действительно Выход объединителя может быть снова использован в качестве входных данных для объединителя для агрегирования со следующими элементами коллекции pcollection. Таким образом, вывод объединителя должен быть в том же формате, что и вход объединителя.

Также, как указал Ini go, мне нужно было установить значение временной метки в pcollection, чтобы окно работало правильно .

Это обновленный пример:

combine_count = 0
   

def test_combine(data):
    global combine_count
    combine_count += 1
    logging.info(f'test combine: {combine_count}')
    logging.info(f'input: {list(data)}')
    combined_id = '+'.join([d['id'] for d in data])
    combined_ts = max([d['ts'] for d in data])
    combined = {'id': combined_id, 'ts': combined_ts}
    logging.info(f'output: {combined}')
    return combined


def run():
    logging.info('start pipeline')
    pipeline_options = PipelineOptions(
        None, streaming=True, save_main_session=True,
    )
    p = beam.Pipeline(options=pipeline_options)

    ts = int(time.time())

    data = p | beam.Create([
        {'id': '1', 'ts': ts},
        {'id': '2', 'ts': ts + 5},
        {'id': '3', 'ts': ts + 12}
    ])

    purchase_paths = (
        data
        | 'With timestamps' >> beam.Map(lambda x: beam.window.TimestampedValue(x, x['ts']))
        | WindowInto(FixedWindows(10))
        | beam.CombineGlobally(test_combine).without_defaults()
    )

    result = p.run()
    result.wait_until_finish()
    logging.info('end pipeline')


if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run()

Результат этого примера выглядит следующим образом:

INFO:root:test combine: 1
INFO:root:input: [{'id': '2', 'ts': 1596791192}, {'id': '3', 'ts': 1596791199}]
INFO:root:output: {'id': '2+3', 'ts': 1596791199}
INFO:apache_beam.runners.portability.fn_api_runner.fn_runner:Running (((CombineGlobally(test_combine)/CombinePerKey/Group/Read)+(CombineGlobally(test_combine)/CombinePerKey/Merge))+(CombineGlobally(test_combine)/CombinePerKey/ExtractOutputs))+(ref_AppliedPTransform_CombineGlobally(test_combine)/UnKey_28)
INFO:root:test combine: 2
INFO:root:input: [{'id': '1', 'ts': 1596791187}]
INFO:root:output: {'id': '1', 'ts': 1596791187}
INFO:root:test combine: 3
INFO:root:input: [{'id': '1', 'ts': 1596791187}]
INFO:root:output: {'id': '1', 'ts': 1596791187}
INFO:root:test combine: 4
INFO:root:input: [{'id': '2+3', 'ts': 1596791199}]
INFO:root:output: {'id': '2+3', 'ts': 1596791199}
INFO:root:test combine: 5
INFO:root:input: [{'id': '2+3', 'ts': 1596791199}]
INFO:root:output: {'id': '2+3', 'ts': 1596791199}
INFO:root:end pipeline

Я до сих пор не совсем понимаю, почему комбайнер называется так много раз. Но согласно документации это может случиться.

1 Ответ

2 голосов
/ 04 августа 2020

Похоже, это происходит из-за структуры MapReduce. При использовании комбайнеров вывод одного комбайнера используется как вход.

В качестве примера представьте, что суммируются 3 числа (1, 2, 3). Объединитель МОЖЕТ суммировать первые 1 и 2 (3) и использовать это число в качестве входных данных с 3 (3 + 3 = 6). В вашем случае [1, 2, 3], кажется, используется как вход в следующем объединителе.

Пример, который действительно помог мне понять это:

p = beam.Pipeline()

def make_list(elements):
    print(elements)
    return elements

(p | Create(range(30))
   | beam.core.CombineGlobally(make_list))

p.run()

Видно, что элемент [1,..,10] - используется в следующем сумматоре.

...