BigQueryIO: запрос настраивается с помощью параметров, но «Значение доступно только во время выполнения» - PullRequest
0 голосов
/ 24 февраля 2019

Apache Beam 2.9.0

Я настроил конвейер, который извлекает данные из BigQuery и выполняет серию преобразований.К опциям присваивается начальная дата с использованием ValueProvider:

ValueProvider<String> getStartTime();

void setStartTime(ValueProvider<String> startTime);

. Затем я собираюсь извлечь данные с помощью BigQueryIO (что-то немного изменилось, чтобы сделать его явным, чтопроисходит):

BigQueryIO.read(
            (SerializableFunction<SchemaAndRecord, AggregatedRowRecord>)
                input -> new BigQueryParser().apply(input.getRecord()))
        .withoutValidation()
        .withTemplateCompatibility()
        .fromQuery(
            ValueProvider.NestedValueProvider.of(
                opts.getStartTime(),
                (SerializableFunction<String, String>)
                    input -> {
                      Instant instant = Instant.parse(input);

                      return String.format(
                          <large SQL statement with a %s in it>,
                          String.format(
                              "%d_%d_%d",
                              instant.get(ChronoField.YEAR),
                              instant.get(ChronoField.MONTH_OF_YEAR),
                              instant.get(ChronoField.DAY_OF_MONTH)));
                    }))
        .withCoder(<coder for AggregatedRowRecords>)
        .usingStandardSql()

Затем он добавляется в конвейер в обычном режиме (p.apply(<above>)).

Теперь я его запускаю:

--project=<project> \
--tempLocation=<directory> \
--stagingLocation=<directory> \
--network=dataflow \
--subnetwork=<subnetwork> \
--defaultWorkerLogLevel=DEBUG
--appName=<name>
--runner=DirectRunner

Это вызываетследующая ошибка:

org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.IllegalStateException: Value only available at runtime, but accessed from a non-runtime context: RuntimeValueProvider{propertyName=startTime, default=null}
        at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:332)
        at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:302)
        at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:197)
        at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:64)
        at org.apache.beam.sdk.Pipeline.run(Pipeline.java:313)
        at org.apache.beam.sdk.Pipeline.run(Pipeline.java:299)
        at <class>.main(<class>.java:<>)
Caused by: java.lang.IllegalStateException: Value only available at runtime, but accessed from a non-runtime context: RuntimeValueProvider{propertyName=startTime, default=null}
        at org.apache.beam.sdk.options.ValueProvider$RuntimeValueProvider.get(ValueProvider.java:228)
        at org.apache.beam.sdk.options.ValueProvider$NestedValueProvider.get(ValueProvider.java:131)
        at org.apache.beam.sdk.io.gcp.bigquery.BigQueryQuerySource.createBasicQueryConfig(BigQueryQuerySource.java:230)
        at org.apache.beam.sdk.io.gcp.bigquery.BigQueryQuerySource.dryRunQueryIfNeeded(BigQueryQuerySource.java:175)
        at org.apache.beam.sdk.io.gcp.bigquery.BigQueryQuerySource.getTableToExtract(BigQueryQuerySource.java:115)
        at org.apache.beam.sdk.io.gcp.bigquery.BigQuerySourceBase.extractFiles(BigQuerySourceBase.java:102)
        at org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$TypedRead$2.processElement(BigQueryIO.java:783)

Использование NestedValueProvider происходит от этого примера при настройке шаблонов :

The user provides a substring for a BigQuery query, such as a specific date. The transform uses the substring to create the full query. Calling .get() returns the full query.

УдалениеОднако логика провайдера значений, похоже, не помогает.Полное удаление ValueProvider из раздела withQuery работает нормально, но не позволяет установить его с помощью параметров.

1 Ответ

0 голосов
/ 26 февраля 2019

Исключение объясняет вам проблему, Apache Beam сначала строит конвейер и классы, а затем начинает запускать данные в конвейере, на этом этапе вы не можете получить доступ к опциям, это просто метаданные для построения конвейера.

Способ преодолеть это - создать функцию ParDo / PTransform, которая получит нужные параметры в качестве параметров в конструкторе, а затем получит к нему доступ в своей логике.

См.пример: (в моем случае использования я столкнулся с той же проблемой в последние дни)

Конвейер:

   HistoryProcessingOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
                .as(HistoryProcessingOptions.class);
        Pipeline pipeline = Pipeline.create(options);

  pipeline.apply(SourceRead.of(options.getSourceBigQueryTable().get(),
            options.getSourceBigQueryDataset().get(),
            options.getSourceBigQueryProject().get(),
            options.getFromDate().get(),
            options.getToDate().get()
            ))

Сам преобразователь:

public class SourceRead extends PTransform<PBegin, PCollection<TableRow>> {

private String sourceBigQueryTable;

private String sourceBigQueryDataset;

private String sourceBigQueryProject;

private String formDate;

private String toDate;

private static Logger logger = LoggerFactory.getLogger(SourceRead.class);


public SourceRead(String sourceBigQueryTable, String sourceBigQueryDataset, String sourceBigQueryProject, String formDate, String toDate) {
    this.sourceBigQueryTable = sourceBigQueryTable;
    this.sourceBigQueryDataset = sourceBigQueryDataset;
    this.sourceBigQueryProject = sourceBigQueryProject;
    this.formDate = formDate;
    this.toDate = toDate;
}

public static SourceRead of(String sourceBigQueryTable, String sourceBigQueryDataset, String sourceBigQueryProject, String yearToLoad, String dateToLoad) {
    return new SourceRead(sourceBigQueryTable, sourceBigQueryDataset, sourceBigQueryProject, yearToLoad, dateToLoad);
}





@Override
public PCollection<TableRow> expand(PBegin input) {
    String query = "SELECT * FROM TABLE_DATE_RANGE([" + sourceBigQueryProject + ":"+sourceBigQueryDataset+"."+sourceBigQueryTable+"],"
            + "TIMESTAMP('" + formDate + "'),"
            + "TIMESTAMP('" + toDate + "'))";
    logger.info("query is"+ query);
    return input.apply(BigQueryIO.readTableRows()
            .fromQuery(query));
}
...