Поток данных отбрасывает огромное количество событий из-за объекта Window или внутренней обработки - PullRequest
0 голосов
/ 08 октября 2019

Недавно разрабатывался потребитель Dataflow, который считывал из подписки PubSub и выводил в файлы Parquet комбинацию всех этих объектов, сгруппированных в одном окне.

Пока я выполнял тестирование этого без огромной загрузки всегоказалось, работает нормально.

Однако после выполнения некоторых тяжелых испытаний я вижу, что из 1.000.000 событий , отправленных в эту очередь PubSub, только 1000 переходит в Parquet !

Согласно нескольким временам стены на разных этапах, тот, который анализирует события до применения окна, кажется, длится 58 минут. Последняя стадия записи в файлы Parquet длится 1 час и 32 минуты .

Теперь я покажу наиболее важные части кода внутри, надеюсь, вы сможете пролить свет на то, если это из-залогика, которая предшествует определению объекта Window или если это сам объект Window.

pipeline
        .apply("Reading PubSub Events",
            PubsubIO.readMessagesWithAttributes()
                .fromSubscription(options.getSubscription()))
        .apply("Map to AvroSchemaRecord (GenericRecord)",
            ParDo.of(new PubsubMessageToGenericRecord()))
        .setCoder(AvroCoder.of(AVRO_SCHEMA))
        .apply("15m window",
            Window.<GenericRecord>into(FixedWindows.of(Duration.standardMinutes(15)))
                .triggering(AfterProcessingTime
                    .pastFirstElementInPane()
                    .plusDelayOf(Duration.standardSeconds(1)))
                .withAllowedLateness(Duration.ZERO)
                .accumulatingFiredPanes()
        )

Также обратите внимание, что я использую Beam 2.9.0.

Может ли логика на втором этапебыть слишком тяжелым, чтобы сообщения приходили слишком поздно и отбрасывались в окне? Логика в основном состоит в чтении полезной нагрузки, разборе в POJO (чтение атрибутов внутренней карты, фильтрация и т. Д.)

Однако, если я отправил миллион событий в PubSub, все эти миллионы событий сделают это, пока Паркет не запишет вэтап файла, но тогда эти файлы Parquet не содержат все эти события, только частично. Имеет ли это смысл?

Мне понадобится триггер, чтобы использовать все эти события независимо от задержки.

1 Ответ

1 голос
/ 12 октября 2019

Ссылаясь на ответ в списке рассылки Apache Beam:

Это неприятная проблема с удобством использования триггеров, когда вы можете случайно закрыть окно и удалить все данные. Я думаю, что вместо этого вам, вероятно, нужен этот триггер:

Repeatedly.forever(
    AfterProcessingTime
                .pastFirstElementInPane()
                .plusDelayOf(Duration.standardSeconds(1)))

Способ, которым я рекомендую выразить этот триггер:

AfterWatermark.pastEndOfWindow().withEarlyFirings(
    AfterProcessingTime
                .pastFirstElementInPane()
                .plusDelayOf(Duration.standardSeconds(1)))

Во втором случае невозможно случайно "закрыть"окно и сбросьте все данные.

...