Google Dataflow Python Apache Проблема задержки оконного луча - PullRequest
0 голосов
/ 26 марта 2020

У меня есть простой конвейер, который получает данные от PubSub, распечатывает их, а затем каждые 10 секунд запускает окно в GroupByKey и печатает это сообщение снова.

Однако это окно иногда задерживается. Это ограничение Google или что-то не так с моим кодом:

 with beam.Pipeline(options=pipeline_options) as pipe:
        messages = (
            pipe
            | beam.io.ReadFromPubSub(subscription=known_args.input_subscription).with_output_types(bytes)
            | 'decode' >> beam.Map(lambda x: x.decode('utf-8'))
            | 'Ex' >> beam.ParDo(ExtractorAndPrinter())
            | beam.WindowInto(window.FixedWindows(10), allowed_lateness=0, accumulation_mode=AccumulationMode.DISCARDING, trigger=AfterProcessingTime(10) )
            | 'group' >> beam.GroupByKey()
            | 'PRINTER' >> beam.ParDo(PrinterWorker()))

enter image description here

Изменить для самого последнего кода. Я удалил триггеры, но проблема сохраняется:

class ExtractorAndCounter(beam.DoFn):
    def __init__(self):
        beam.DoFn.__init__(self)

    def process(self, element, *args, **kwargs):
        import logging
        logging.info(element)
        return [("Message", json.loads(element)["Message"])]

class PrinterWorker(beam.DoFn):
    def __init__(self):
        beam.DoFn.__init__(self)

    def process(self, element, *args, **kwargs):
        import logging
        logging.info(element)
        return [str(element)]


class DefineTimestamp(beam.DoFn):
    def process(self, element, *args, **kwargs):
        from datetime import datetime
        return [(str(datetime.now()), element)]

def run(argv=None, save_main_session=True):
    """Build and run the pipeline."""
    parser = argparse.ArgumentParser()
    parser.add_argument(
      '--output_topic',
      required=True,
      help=(
          'Output PubSub topic of the form '
          '"projects/<PROJECT>/topics/<TOPIC>".'))
    group = parser.add_mutually_exclusive_group(required=True)
    group.add_argument(
      '--input_topic',
      help=(
          'Input PubSub topic of the form '
          '"projects/<PROJECT>/topics/<TOPIC>".'))
    group.add_argument(
      '--input_subscription',
      help=(
          'Input PubSub subscription of the form '
          '"projects/<PROJECT>/subscriptions/<SUBSCRIPTION>."'))
    known_args, pipeline_args = parser.parse_known_args(argv)

pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
pipeline_options.view_as(StandardOptions).streaming = True
with beam.Pipeline(options=pipeline_options) as pipe:
    messages = (
        pipe
        | beam.io.ReadFromPubSub(subscription=known_args.input_subscription).with_output_types(bytes)
        | 'decode' >> beam.Map(lambda x: x.decode('utf-8'))
        | 'Ex' >> beam.ParDo(ExtractorAndCounter())
        | beam.WindowInto(window.FixedWindows(10))
        | 'group' >> beam.GroupByKey()
        | 'PRINTER' >> beam.ParDo(PrinterWorker())
        | 'encode' >> beam.Map(lambda x: x.encode('utf-8'))
        | beam.io.WriteToPubSub(known_args.output_topic))



if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run()

Ответы [ 2 ]

1 голос
/ 26 марта 2020

Итак, что в основном просит конвейер, это сгруппировать элементы по 10 секунд windows и запускать каждое окно по истечении 10 секунд с момента получения первого элемента для каждого окна (и отбрасывать оставшиеся данные для этого окно). Было ли это вашим намерением?

Предполагая, что это так, обратите внимание, что запуск зависит от времени, которое система получила, а также от времени получения первого элемента для каждого окна. Вероятно, именно поэтому вы видите некоторые изменения в ваших результатах.

Я думаю, если вам нужна более последовательная группировка для ваших элементов, вы должны использовать триггеры времени события вместо триггеров времени обработки.

0 голосов
/ 26 марта 2020

Все триггеры основаны на максимальном усилии, то есть они будут срабатывать через некоторое время после указанной продолжительности, в этом случае 10 сек c. Обычно это происходит близко к указанному времени, но возможна задержка в несколько секунд.

Кроме того, триггеры установлены для клавиши + окно. Окно получено из времени события. Вполне возможно, что первая пинта GBK в 10:30:04 обусловлена ​​первым элементом, который по состоянию на 10:29:52 Вторая печать GBK в 10:30:07 обусловлена ​​первым элементом в 10:29:56

Так что будет хорошо напечатать окно и метку времени события для каждого элемента, а затем сопоставить данные.

...