LE: TL; DR; Как мне создать неограниченный источник данных в Python? Возможно ли это?
Я создаю поток потоковых данных, который будет непрерывно обрабатывать значения с плавающей точкой, поступающие от датчиков, которые имеют временную метку, идентификатор и значение чтения, помещает значения в FixedWindows
в 2 секунды, затем выведите агрегацию.
Кодовая ссылка: https://gist.github.com/nicolaerosia/51981c600dacab4c021d99c0ce838b79
Вот конвейер для быстрого просмотра:
files = [
"in.csv",
]
fields = (p | beam.Create(files).with_output_types(str)
| beam.ParDo(FileReader())
| "ParseRawLine" >> beam.ParDo(ParseRawLine())
| "AddEventTimestamp" >> beam.Map(lambda elem: beam.window.TimestampedValue(elem, elem['timestamp']))
| "window" >> beam.WindowInto(windowfn=beam.transforms.window.FixedWindows(2),
trigger=AfterWatermark(late=AfterProcessingTime(1)),
accumulation_mode=AccumulationMode.DISCARDING,
)
| "MapID" >> beam.Map(lambda x: (x['id'], x['value']))
| beam.GroupByKey()
| "DummyWindowPrint" >> beam.ParDo(DummyWindowPrint())
)
У меня проблема в том, что операция после GroupByKey
начинается только после завершения ввода, однако я хочу использовать триггер на WindowInto
, который сработает через 1 секунду после поступления последней записи в окне.
Пример вывода:
DEBUG:root:ParseRawLine(140117071822672): entry: {'timestamp': Timestamp(1583964059.983996), 'id': 79, 'value': '0.6312056749059605'}
DEBUG:apache_beam.runners.worker.bundle_processor:finish <DataInputOperation Create/Impulse receivers=[SingletonConsumerSet[Create/Impulse.out0, coder=WindowedValueCoder[BytesCoder], len(consumers)=1]]>
DEBUG:apache_beam.runners.worker.bundle_processor:finish <DoOperation Create/FlatMap(<lambda at core.py:2597>) output_tags=['out'], receivers=[SingletonConsumerSet[Create/FlatMap(<lambda at core.py:2597>).out0, coder=WindowedValueCoder[BytesCoder], len(consumers)=1]]>
DEBUG:apache_beam.runners.worker.bundle_processor:finish <DoOperation Create/Map(decode) output_tags=['out'], receivers=[SingletonConsumerSet[Create/Map(decode).out0, coder=WindowedValueCoder[StrUtf8Coder], len(consumers)=1]]>
DEBUG:apache_beam.runners.worker.bundle_processor:finish <DoOperation ParDo(FileReader) output_tags=['out'], receivers=[SingletonConsumerSet[ParDo(FileReader).out0, coder=WindowedValueCoder[StrUtf8Coder], len(consumers)=1]]>
DEBUG:root:FileReader(140117071413776): finish_bundle
DEBUG:apache_beam.runners.worker.bundle_processor:finish <DoOperation ParseRawLine output_tags=['out'], receivers=[SingletonConsumerSet[ParseRawLine.out0, coder=WindowedValueCoder[FastPrimitivesCoder], len(consumers)=1]]>
DEBUG:apache_beam.runners.worker.bundle_processor:finish <DoOperation AddEventTimestamp output_tags=['out'], receivers=[SingletonConsumerSet[AddEventTimestamp.out0, coder=WindowedValueCoder[FastPrimitivesCoder], len(consumers)=1]]>
DEBUG:apache_beam.runners.worker.bundle_processor:finish <DoOperation window output_tags=['out'], receivers=[SingletonConsumerSet[window.out0, coder=WindowedValueCoder[FastPrimitivesCoder], len(consumers)=1]]>
DEBUG:apache_beam.runners.worker.bundle_processor:finish <DoOperation MapID output_tags=['out'], receivers=[SingletonConsumerSet[MapID.out0, coder=WindowedValueCoder[TupleCoder[LengthPrefixCoder[FastPrimitivesCoder], LengthPrefixCoder[FastPrimitivesCoder]]], len(consumers)=1]]>
DEBUG:apache_beam.runners.worker.bundle_processor:finish <DataOutputOperation GroupByKey/Write >
DEBUG:apache_beam.runners.portability.fn_api_runner:Wait for the bundle bundle_1 to finish.
<!!!!!! HERE !!!!!!>
INFO:apache_beam.runners.portability.fn_api_runner:Running (GroupByKey/Read)+(ref_AppliedPTransform_DummyWindowPrint_16)
<!!!!!! HERE !!!!!!>
DEBUG:apache_beam.runners.worker.sdk_worker:Got work control_10
DEBUG:apache_beam.runners.worker.sdk_worker:Got work control_9
Испытанные версии луча:
master (2020 12 марта)
release-2.19.0
release-2.20.0
Вызовы:
python3 beam_issues.py \
--streaming \
--runner=DirectRunner
ИЛИ
python3 beam_issues.py \
--streaming \
--runner=DirectRunner \
--direct_num_workers=8 \
--direct_running_mode=multi_threading
Чего мне не хватает