Цель
Моя цель - создать шаблон потока данных, который задает конвейер луча Apache. Конвейер работает в пакетном режиме, читает из BigQuery, затем выполняет преобразования и пишет в другое место. Самое важное, что запрос, который я использую для чтения из BigQuery, должен быть предоставлен во время выполнения.
Ожидаемое поведение
Ожидаемый результат - конвейер будет использовать параметр времени выполнения для указания запроса BigQuery, выполнить команду запрос, а затем продолжить с остальной частью конвейера.
Фактическое поведение
Фактическое поведение - параметр времени выполнения, в который я передаю, игнорируется, и вместо этого параметр I имел , который указывается при создании шаблона GCS.
Соответствующий код
Ниже описано, как я определяю операцию чтения, и как определяется и передается параметр запроса.
public interface MyOptions extends PipelineOptions, StreamingOptions {
@Description("Query String")
ValueProvider<String> getQueryString();
void setQueryString(ValueProvider<String> value);
}
public static void main(String[] args) {
MyOptions options = PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(MyOptions.class);
Pipeline p = Pipeline.create(options);
PCollection<TableRow> tableRows =
p.apply(BigQueryIO.readTableRows()
.fromQuery(options.getQueryString())
.withTemplateCompatibility()
.withoutValidation());
// Add this point I run my transformations and loading
}
Для фактического построения шаблона и вывода sh для GCS, я делаю следующее
mvn compile -Pdataflow-runner exec:java -Dexec.mainClass=com.Pipeline "-Dexec.args=--runner=DataflowRunner --queryString='SELECT time,type FROM [my-project:timeseries.my-data] where time between TIMESTAMP(\"2020-02-13T00:00:00Z\") and TIMESTAMP(\"2020-02-15T00:00:00Z\")'"
Наконец, я использую веб-интерфейс Dataflow, чтобы выбрать шаблон из GCS и выполнить развертывание. В нижней части веб-интерфейса я указываю свои параметры времени выполнения, где я задаю queryString и запрос времени выполнения, который я хочу использовать.
Примечание: при I go чтобы запустить шаблон в потоке данных, я указываю queryString и я точно знаю, что он передается. Я переписал свое первое преобразование, чтобы вывести queryString , и он правильно печатает указанное вариант времени выполнения. Проблема в том, что «read from BigQuery» queryString по-прежнему является исходным, использованным при создании шаблона.