Я пытаюсь понять комбайн трансформатора в конвейере пучка apache.
Учитывая следующий пример конвейера:
def test_combine(data):
logging.info('test combine')
logging.info(type(data))
logging.info(data)
return [1, 2, 3]
def run():
logging.info('start pipeline')
pipeline_options = PipelineOptions(
None, streaming=True, save_main_session=True,
)
p = beam.Pipeline(options=pipeline_options)
data = p | beam.Create([
{'id': '1', 'ts': datetime.datetime.utcnow()},
{'id': '2', 'ts': datetime.datetime.utcnow()},
{'id': '3', 'ts': datetime.datetime.utcnow()}
])
purchase_paths = (
data
| WindowInto(FixedWindows(10))
| beam.CombineGlobally(test_combine).without_defaults()
)
result = p.run()
result.wait_until_finish()
logging.info('end pipeline')
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
Создает следующий вывод журнала:
INFO:root:test combine
INFO:root:<class 'list'>
INFO:root:[{'id': '1', 'ts': datetime.datetime(2020, 8, 3, 19, 22, 53, 193363)}, {'id': '2', 'ts': datetime.datetime(2020, 8, 3, 19, 22, 53, 193366)}, {'id': '3', 'ts': datetime.datetime(2020, 8, 3, 19, 22, 53, 193367)}]
INFO:root:test combine
INFO:root:<class 'apache_beam.transforms.core._ReiterableChain'>
INFO:root:<apache_beam.transforms.core._ReiterableChain object at 0x1210faf50>
INFO:root:test combine
INFO:root:<class 'list'>
INFO:root:[[1, 2, 3]]
INFO:root:end pipeline
Почему функция комбинирования вызывается трижды и каждый раз получает разные входные данные? В последнем вызове он, кажется, получает собственное возвращаемое значение в качестве входных.
Обновление
Я неправильно понял из объединителя. В документации сказано:
Функция комбинирования должна быть коммутативной и ассоциативной, поскольку функция не обязательно вызывается ровно один раз для всех значений с заданным ключом
Действительно Выход объединителя может быть снова использован в качестве входных данных для объединителя для агрегирования со следующими элементами коллекции pcollection. Таким образом, вывод объединителя должен быть в том же формате, что и вход объединителя.
Также, как указал Ini go, мне нужно было установить значение временной метки в pcollection, чтобы окно работало правильно .
Это обновленный пример:
combine_count = 0
def test_combine(data):
global combine_count
combine_count += 1
logging.info(f'test combine: {combine_count}')
logging.info(f'input: {list(data)}')
combined_id = '+'.join([d['id'] for d in data])
combined_ts = max([d['ts'] for d in data])
combined = {'id': combined_id, 'ts': combined_ts}
logging.info(f'output: {combined}')
return combined
def run():
logging.info('start pipeline')
pipeline_options = PipelineOptions(
None, streaming=True, save_main_session=True,
)
p = beam.Pipeline(options=pipeline_options)
ts = int(time.time())
data = p | beam.Create([
{'id': '1', 'ts': ts},
{'id': '2', 'ts': ts + 5},
{'id': '3', 'ts': ts + 12}
])
purchase_paths = (
data
| 'With timestamps' >> beam.Map(lambda x: beam.window.TimestampedValue(x, x['ts']))
| WindowInto(FixedWindows(10))
| beam.CombineGlobally(test_combine).without_defaults()
)
result = p.run()
result.wait_until_finish()
logging.info('end pipeline')
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
Результат этого примера выглядит следующим образом:
INFO:root:test combine: 1
INFO:root:input: [{'id': '2', 'ts': 1596791192}, {'id': '3', 'ts': 1596791199}]
INFO:root:output: {'id': '2+3', 'ts': 1596791199}
INFO:apache_beam.runners.portability.fn_api_runner.fn_runner:Running (((CombineGlobally(test_combine)/CombinePerKey/Group/Read)+(CombineGlobally(test_combine)/CombinePerKey/Merge))+(CombineGlobally(test_combine)/CombinePerKey/ExtractOutputs))+(ref_AppliedPTransform_CombineGlobally(test_combine)/UnKey_28)
INFO:root:test combine: 2
INFO:root:input: [{'id': '1', 'ts': 1596791187}]
INFO:root:output: {'id': '1', 'ts': 1596791187}
INFO:root:test combine: 3
INFO:root:input: [{'id': '1', 'ts': 1596791187}]
INFO:root:output: {'id': '1', 'ts': 1596791187}
INFO:root:test combine: 4
INFO:root:input: [{'id': '2+3', 'ts': 1596791199}]
INFO:root:output: {'id': '2+3', 'ts': 1596791199}
INFO:root:test combine: 5
INFO:root:input: [{'id': '2+3', 'ts': 1596791199}]
INFO:root:output: {'id': '2+3', 'ts': 1596791199}
INFO:root:end pipeline
Я до сих пор не совсем понимаю, почему комбайнер называется так много раз. Но согласно документации это может случиться.