Привет У меня есть пара запросов, которые я хочу выполнить и сохранять результаты в последовательности один за другим, используя Apache Луч, я видел несколько похожих вопросов, но не смог найти ответ. Я привык проектировать трубопроводы, используя Airflow, и я довольно плохо знаком с Apache Beam. Я использую бегун Dataflow. Вот мой код прямо сейчас: я бы хотел, чтобы query2 выполнялся только после того, как результаты query1 будут сохранены в соответствующей таблице. Как их объединить?
PCollection<TableRow> resultsStep1 = getData("Run Query 1",
"Select * FROM basetable");
resultsStep1.apply("Save Query1 data",
BigQueryIO.writeTableRows()
.withSchema(BigQueryUtils.toTableSchema(resultsStep1.getSchema()))
.to("resultsStep1")
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
);
PCollection<TableRow> resultsStep2 = getData("Run Query 2",
"Select * FROM resultsStep1");
resultsStep2.apply("Save Query2 data",
BigQueryIO.writeTableRows()
.withSchema(BigQueryUtils.toTableSchema(resultsStep2.getSchema()))
.to("resultsStep2")
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
);
А вот мое определение функции getData:
private PCollection<TableRow> getData(final String taskName, final String query) {
return pipeline.apply(taskName,
BigQueryIO.readTableRowsWithSchema()
.fromQuery(query)
.usingStandardSql()
.withCoder(TableRowJsonCoder.of()));
}
Редактировать (обновить): Получается: You can’t sequence the completion of a BigQuery write with other steps of your pipeline.
Что я считаю большим ограничением для проектирования трубопроводов. Источник: https://beam.apache.org/documentation/io/built-in/google-bigquery/#limitations