Apache Beam Python Windowing и GroupByKey - PullRequest
1 голос
/ 12 марта 2020

LE: TL; DR; Как мне создать неограниченный источник данных в Python? Возможно ли это?

Я создаю поток потоковых данных, который будет непрерывно обрабатывать значения с плавающей точкой, поступающие от датчиков, которые имеют временную метку, идентификатор и значение чтения, помещает значения в FixedWindows в 2 секунды, затем выведите агрегацию.

Кодовая ссылка: https://gist.github.com/nicolaerosia/51981c600dacab4c021d99c0ce838b79

Вот конвейер для быстрого просмотра:

    files = [
        "in.csv",
    ]

    fields = (p | beam.Create(files).with_output_types(str)
                | beam.ParDo(FileReader())
                | "ParseRawLine" >> beam.ParDo(ParseRawLine())
                | "AddEventTimestamp" >> beam.Map(lambda elem: beam.window.TimestampedValue(elem, elem['timestamp']))
                | "window" >> beam.WindowInto(windowfn=beam.transforms.window.FixedWindows(2),
                    trigger=AfterWatermark(late=AfterProcessingTime(1)),
                    accumulation_mode=AccumulationMode.DISCARDING,
                )
                | "MapID" >> beam.Map(lambda x: (x['id'], x['value']))
                | beam.GroupByKey()
                | "DummyWindowPrint" >> beam.ParDo(DummyWindowPrint())
    )

У меня проблема в том, что операция после GroupByKey начинается только после завершения ввода, однако я хочу использовать триггер на WindowInto, который сработает через 1 секунду после поступления последней записи в окне.

Пример вывода:

DEBUG:root:ParseRawLine(140117071822672): entry: {'timestamp': Timestamp(1583964059.983996), 'id': 79, 'value': '0.6312056749059605'}
DEBUG:apache_beam.runners.worker.bundle_processor:finish <DataInputOperation Create/Impulse receivers=[SingletonConsumerSet[Create/Impulse.out0, coder=WindowedValueCoder[BytesCoder], len(consumers)=1]]>
DEBUG:apache_beam.runners.worker.bundle_processor:finish <DoOperation Create/FlatMap(<lambda at core.py:2597>) output_tags=['out'], receivers=[SingletonConsumerSet[Create/FlatMap(<lambda at core.py:2597>).out0, coder=WindowedValueCoder[BytesCoder], len(consumers)=1]]>
DEBUG:apache_beam.runners.worker.bundle_processor:finish <DoOperation Create/Map(decode) output_tags=['out'], receivers=[SingletonConsumerSet[Create/Map(decode).out0, coder=WindowedValueCoder[StrUtf8Coder], len(consumers)=1]]>
DEBUG:apache_beam.runners.worker.bundle_processor:finish <DoOperation ParDo(FileReader) output_tags=['out'], receivers=[SingletonConsumerSet[ParDo(FileReader).out0, coder=WindowedValueCoder[StrUtf8Coder], len(consumers)=1]]>
DEBUG:root:FileReader(140117071413776): finish_bundle
DEBUG:apache_beam.runners.worker.bundle_processor:finish <DoOperation ParseRawLine output_tags=['out'], receivers=[SingletonConsumerSet[ParseRawLine.out0, coder=WindowedValueCoder[FastPrimitivesCoder], len(consumers)=1]]>
DEBUG:apache_beam.runners.worker.bundle_processor:finish <DoOperation AddEventTimestamp output_tags=['out'], receivers=[SingletonConsumerSet[AddEventTimestamp.out0, coder=WindowedValueCoder[FastPrimitivesCoder], len(consumers)=1]]>
DEBUG:apache_beam.runners.worker.bundle_processor:finish <DoOperation window output_tags=['out'], receivers=[SingletonConsumerSet[window.out0, coder=WindowedValueCoder[FastPrimitivesCoder], len(consumers)=1]]>
DEBUG:apache_beam.runners.worker.bundle_processor:finish <DoOperation MapID output_tags=['out'], receivers=[SingletonConsumerSet[MapID.out0, coder=WindowedValueCoder[TupleCoder[LengthPrefixCoder[FastPrimitivesCoder], LengthPrefixCoder[FastPrimitivesCoder]]], len(consumers)=1]]>
DEBUG:apache_beam.runners.worker.bundle_processor:finish <DataOutputOperation GroupByKey/Write >
DEBUG:apache_beam.runners.portability.fn_api_runner:Wait for the bundle bundle_1 to finish.

<!!!!!! HERE !!!!!!>
INFO:apache_beam.runners.portability.fn_api_runner:Running (GroupByKey/Read)+(ref_AppliedPTransform_DummyWindowPrint_16)
<!!!!!! HERE !!!!!!>

DEBUG:apache_beam.runners.worker.sdk_worker:Got work control_10
DEBUG:apache_beam.runners.worker.sdk_worker:Got work control_9

Испытанные версии луча:

  • master (2020 12 марта)

  • release-2.19.0

  • release-2.20.0

Вызовы:

python3 beam_issues.py \
--streaming \
--runner=DirectRunner

ИЛИ

python3 beam_issues.py \
--streaming \
--runner=DirectRunner \
--direct_num_workers=8 \
--direct_running_mode=multi_threading

Чего мне не хватает

...