Apache Луч потока данных не принимает ValueProvider для запроса BigQuery - PullRequest
1 голос
/ 21 февраля 2020

Цель

Моя цель - создать шаблон потока данных, который задает конвейер луча Apache. Конвейер работает в пакетном режиме, читает из BigQuery, затем выполняет преобразования и пишет в другое место. Самое важное, что запрос, который я использую для чтения из BigQuery, должен быть предоставлен во время выполнения.

Ожидаемое поведение

Ожидаемый результат - конвейер будет использовать параметр времени выполнения для указания запроса BigQuery, выполнить команду запрос, а затем продолжить с остальной частью конвейера.

Фактическое поведение

Фактическое поведение - параметр времени выполнения, в который я передаю, игнорируется, и вместо этого параметр I имел , который указывается при создании шаблона GCS.

Соответствующий код

Ниже описано, как я определяю операцию чтения, и как определяется и передается параметр запроса.

public interface MyOptions extends PipelineOptions, StreamingOptions {
    @Description("Query String")
    ValueProvider<String> getQueryString();

    void setQueryString(ValueProvider<String> value);
}

public static void main(String[] args) {
        MyOptions options = PipelineOptionsFactory.fromArgs(args)
                .withValidation()
                .as(MyOptions.class);
        Pipeline p = Pipeline.create(options);

        PCollection<TableRow> tableRows =
                p.apply(BigQueryIO.readTableRows()
                        .fromQuery(options.getQueryString())
                        .withTemplateCompatibility()
                        .withoutValidation());
// Add this point I run my transformations and loading
}

Для фактического построения шаблона и вывода sh для GCS, я делаю следующее

mvn compile -Pdataflow-runner exec:java -Dexec.mainClass=com.Pipeline "-Dexec.args=--runner=DataflowRunner --queryString='SELECT time,type FROM [my-project:timeseries.my-data] where time between TIMESTAMP(\"2020-02-13T00:00:00Z\") and TIMESTAMP(\"2020-02-15T00:00:00Z\")'"

Наконец, я использую веб-интерфейс Dataflow, чтобы выбрать шаблон из GCS и выполнить развертывание. В нижней части веб-интерфейса я указываю свои параметры времени выполнения, где я задаю queryString и запрос времени выполнения, который я хочу использовать.

Примечание: при I go чтобы запустить шаблон в потоке данных, я указываю queryString и я точно знаю, что он передается. Я переписал свое первое преобразование, чтобы вывести queryString , и он правильно печатает указанное вариант времени выполнения. Проблема в том, что «read from BigQuery» queryString по-прежнему является исходным, использованным при создании шаблона.

1 Ответ

2 голосов
/ 21 февраля 2020

После многих итераций я разобрался в проблеме. На самом деле их было 2, самое большое из которых мне не нужно было фактически передавать параметр времени выполнения в шаг «шаблона сборки».

  1. Не передавать параметр времени выполнения при построении конвейера. Это кажется очевидным, но отбросить это из mvn compile args
  2. Форматирование queryString как параметра времени выполнения было трудным. Ниже работал для меня после многих итераций
SELECT time,type FROM `my-project.timeseries.my-data` where time between TIMESTAMP(\"2019-02-13T00:00:00Z\") and TIMESTAMP(\"2020-02-15T00:00:00Z\")

Обратите внимание на отсутствие кавычек вокруг всего параметра и как был отформатирован projectId.dataset.tableId.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...