Контекст
Привет всем, я использовал Apache Beam
конвейеры для генерации столбчатой БД для хранения в GCS
, у меня есть поток данных, поступающий из Kafka
, и окно 1м.
Я хочу преобразовать все данные этого 1-метрового окна в столбчатый файл БД (ORC
в моем случае, может быть Parquet
или что-то еще), я написал конвейер для этого преобразования.
Проблема
Я испытываю общую медлительность. Я подозреваю, что это может быть связано с преобразованием группы по ключу, поскольку у меня есть только ключ. Есть ли необходимость в этом? Если нет, то что нужно сделать вместо этого? Я читал, что объединение не очень полезно для этого, так как мой конвейер на самом деле не агрегирует данные, а создает объединенный файл. Что мне точно нужно, так это итеративный список объектов в каждом окне, который будет преобразован в файлы OR C.
Представление конвейера
input -> window -> group by key ( только 1 ключ) -> pardo (для создания БД) -> IO (для записи в GCS)
Что я пробовал
Я пробовал использовать профилировщик, масштабирование по горизонтали / вертикально. Используя профилировщик, я видел, как более 50% времени приходило в группу по ключевой операции. Я действительно считаю, что проблема в горячих клавишах, но я не могу найти решение о том, что должно быть сделано. Когда я удалил операцию group by key
, мой конвейер продолжает работать с задержкой Kafka
(ie, в конце Kafka
это, похоже, не проблема).
Фрагмент кода
p.apply("ReadLines", KafkaIO.<Long, byte[]>read().withBootstrapServers("myserver.com:9092")
.withTopic(options.getInputTopic())
.withTimestampPolicyFactory(MyTimePolicy.myTimestampPolicyFactory())
.withConsumerConfigUpdates(Map.of("group.id", "mygroup-id")).commitOffsetsInFinalize()
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer(ByteArrayDeserializer.class).withoutMetadata())
.apply("UncompressSnappy", ParDo.of(new UncompressSnappy()))
.apply("DecodeProto", ParDo.of(new DecodePromProto()))
.apply("MapTSSample", ParDo.of(new MapTSSample()))
.apply(Window.<TSSample>into(FixedWindows.of(Duration.standardMinutes(1)))
.withTimestampCombiner(TimestampCombiner.END_OF_WINDOW))
.apply(WithKeys.<Integer, TSSample>of(1))
.apply(GroupByKey.<Integer, TSSample>create())
.apply("CreateTSORC", ParDo.of(new CreateTSORC()))
.apply(new WriteOneFilePerWindow(options.getOutput(), 1));
Wall Time Profile
https://gist.github.com/anandsinghkunwar/4cc26f7e3da7473af66ce9a142a74c35