Я строю несколько потоковых конвейеров данных, которые читают из Kafka и пишут в различные приемники, используя Google Cloud Dataflow. Конвейер выглядит примерно так (упрощенно).
// Example pipeline that writes to BigQuery.
Pipeline.create(options)
.apply(KafkaIO.read().withTopic(options.topic))
.apply(/* Convert to a Row type */)
.setRowSchema(schemaRegistry.lookup(options.topic))
.apply(
BigQueryIO.write<Row>()
.useBeamSchema()
.withCreateDisposition(CreateDispotion.CREATE_IF_NEEDED)
.withProject(options.outputProject)
.withDataset(options.outputDataset)
.withTable(options.outputTable)
)
Я планирую запустить конвейер для каждой из наших тем Kafka, которых сотни. Конвейер ищет схему для данной топики c на этапе планирования. Это позволяет BigQueryIO
создавать необходимые таблицы перед запуском конвейера.
Вопрос: Как я могу поддерживать развивающиеся схемы в моих конвейерах потока данных?
Я исследовал возможность обновления существующего задания потока данных (с использованием флага --update
). Мысль заключается в том, что я могу автоматизировать процесс отправки обновленного задания при изменении схемы. Но при обновлении задания время простоя составляет около 3 минут. Для некоторых рабочих мест такое большое время простоя не сработает. Я ищу другие решения, которые, как мы надеемся, имеют не более нескольких секунд простоя.