Последовательное выполнение в Apache Beam - Java SDK 2.18.0 - PullRequest
0 голосов
/ 19 марта 2020

Привет У меня есть пара запросов, которые я хочу выполнить и сохранять результаты в последовательности один за другим, используя Apache Луч, я видел несколько похожих вопросов, но не смог найти ответ. Я привык проектировать трубопроводы, используя Airflow, и я довольно плохо знаком с Apache Beam. Я использую бегун Dataflow. Вот мой код прямо сейчас: я бы хотел, чтобы query2 выполнялся только после того, как результаты query1 будут сохранены в соответствующей таблице. Как их объединить?

    PCollection<TableRow> resultsStep1 = getData("Run Query 1",
            "Select * FROM basetable");

    resultsStep1.apply("Save Query1 data",
            BigQueryIO.writeTableRows()
                    .withSchema(BigQueryUtils.toTableSchema(resultsStep1.getSchema()))
                    .to("resultsStep1")
                    .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
                    .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
    );

    PCollection<TableRow> resultsStep2 = getData("Run Query 2",
            "Select * FROM resultsStep1");

    resultsStep2.apply("Save Query2 data",
            BigQueryIO.writeTableRows()
                    .withSchema(BigQueryUtils.toTableSchema(resultsStep2.getSchema()))
                    .to("resultsStep2")
                    .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
                    .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
    );

А вот мое определение функции getData:

private PCollection<TableRow> getData(final String taskName, final String query) {
    return pipeline.apply(taskName,
            BigQueryIO.readTableRowsWithSchema()
                    .fromQuery(query)
                    .usingStandardSql()
                    .withCoder(TableRowJsonCoder.of()));
}

Редактировать (обновить): Получается: You can’t sequence the completion of a BigQuery write with other steps of your pipeline.

Что я считаю большим ограничением для проектирования трубопроводов. Источник: https://beam.apache.org/documentation/io/built-in/google-bigquery/#limitations

1 Ответ

3 голосов
/ 19 марта 2020

Вы можете использовать метод Wait, чтобы сделать это. Придуманный пример ниже

 PCollection<Void> firstWriteResults = data.apply(ParDo.of(...write to first database...));
 data.apply(Wait.on(firstWriteResults))
     // Windows of this intermediate PCollection will be processed no earlier than when
     // the respective window of firstWriteResults closes.
     .apply(ParDo.of(...write to second database...));

Более подробную информацию можно найти в документации API, представленной здесь - https://beam.apache.org/releases/javadoc/2.17.0/index.html?org / apache / beam / sdk / transforms / Wait. html

...