Замедление / отставание в потоковом конвейере пучка в группе по ключевой стадии - PullRequest
0 голосов
/ 06 марта 2020

Контекст

Привет всем, я использовал Apache Beam конвейеры для генерации столбчатой ​​БД для хранения в GCS, у меня есть поток данных, поступающий из Kafka, и окно 1м.

Я хочу преобразовать все данные этого 1-метрового окна в столбчатый файл БД (ORC в моем случае, может быть Parquet или что-то еще), я написал конвейер для этого преобразования.

Проблема

Я испытываю общую медлительность. Я подозреваю, что это может быть связано с преобразованием группы по ключу, поскольку у меня есть только ключ. Есть ли необходимость в этом? Если нет, то что нужно сделать вместо этого? Я читал, что объединение не очень полезно для этого, так как мой конвейер на самом деле не агрегирует данные, а создает объединенный файл. Что мне точно нужно, так это итеративный список объектов в каждом окне, который будет преобразован в файлы OR C.

Представление конвейера

input -> window -> group by key ( только 1 ключ) -> pardo (для создания БД) -> IO (для записи в GCS)

Что я пробовал

Я пробовал использовать профилировщик, масштабирование по горизонтали / вертикально. Используя профилировщик, я видел, как более 50% времени приходило в группу по ключевой операции. Я действительно считаю, что проблема в горячих клавишах, но я не могу найти решение о том, что должно быть сделано. Когда я удалил операцию group by key, мой конвейер продолжает работать с задержкой Kafka (ie, в конце Kafka это, похоже, не проблема).

Фрагмент кода

p.apply("ReadLines", KafkaIO.<Long, byte[]>read().withBootstrapServers("myserver.com:9092")
    .withTopic(options.getInputTopic())
    .withTimestampPolicyFactory(MyTimePolicy.myTimestampPolicyFactory())
    .withConsumerConfigUpdates(Map.of("group.id", "mygroup-id")).commitOffsetsInFinalize()
    .withKeyDeserializer(LongDeserializer.class)
    .withValueDeserializer(ByteArrayDeserializer.class).withoutMetadata())
    .apply("UncompressSnappy", ParDo.of(new UncompressSnappy()))
    .apply("DecodeProto", ParDo.of(new DecodePromProto()))
    .apply("MapTSSample", ParDo.of(new MapTSSample()))
    .apply(Window.<TSSample>into(FixedWindows.of(Duration.standardMinutes(1)))
        .withTimestampCombiner(TimestampCombiner.END_OF_WINDOW))
    .apply(WithKeys.<Integer, TSSample>of(1))
    .apply(GroupByKey.<Integer, TSSample>create())
    .apply("CreateTSORC", ParDo.of(new CreateTSORC()))
    .apply(new WriteOneFilePerWindow(options.getOutput(), 1));

Wall Time Profile

https://gist.github.com/anandsinghkunwar/4cc26f7e3da7473af66ce9a142a74c35

1 Ответ

0 голосов
/ 31 марта 2020

Проблема действительно заключается в проблеме с горячими клавишами , мне пришлось изменить конвейер, чтобы создать собственный ввод-вывод для файлов OR C и увеличить количество шардов до 50 для моего дело. Я полностью удалил GroupByKey. Поскольку у луча еще нет автоматического определения количества осколков для FileIO.write(), вам придется вручную выбрать число, соответствующее вашей рабочей нагрузке.

Кроме того, включение API потокового движка в Google Dataflow еще больше ускорило прием пищи.

...