Поток данных: доступ к поставщику значений после завершения конвейера - PullRequest
0 голосов
/ 24 января 2019

Я пытаюсь сделать запаздывающее обновление после того, как мой конвейер завершит, где таблицы передаются во время выполнения из-за версии версий. Поскольку этот код выполняется как шаблон, он требует использования nestedValueProviders.

public interface DataQueryRunnerOptions extends DataflowPipelineOptions {

@Description("Table to read/write payload data.")
    @Default.String("test.payloadData")
    ValueProvider<String> getPayloadTable();

@Description("Table to read eligibility data from, and update with payloadData")
    @Default.String("test.dqr_test_eligibilities")
    ValueProvider<String> getEligibilityInputTable();

}

Использование в трубопроводе:

campaignIdToDataQueryMap.apply("RunDataQueries", ParDo.of(new RunDataQueries()))
      .apply("WritePayloadDataToTable", BigQueryIO.writeTableRows()
        .withSchema(getPayloadDataSchema())
        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
        .to(options.getPayloadTable()));

Затем я вызываю код после конвейера:

pipeline.run().waitUntilFinish();

runFinalUpdate(options);

Для метода runFinalUpdate:

private static void runFinalUpdate(DataQueryRunner2Options options) {

    ValueProvider.NestedValueProvider eligTable = ValueProvider.NestedValueProvider.of(
      options.getEligibilityInputTable(),
      (SerializableFunction<String, String>) eligibilityInputTable -> options.getEligibilityInputTable().get()
    );

    ValueProvider.NestedValueProvider payloadTable = ValueProvider.NestedValueProvider.of(
      options.getPayloadTable(),
      (SerializableFunction<String, String>) payload -> options.getPayloadTable().get()
    );

    String finalUpdate = "UPDATE " + eligTable.get() + " elig SET elig.dataQueryPayload = (SELECT pd.dataQueryPayload FROM `"
      + payloadTable.get() + "` pd WHERE pd.numericId = elig.numericId and pd.campaignId = elig.campaignId)"
      + " WHERE elig.dataQueryPayload IS NULL";

    try {
      Utilities.runQuery(finalUpdate);
    } catch (InterruptedException e) {
      LOG.error("Final update failure: " + e.getMessage());
      e.printStackTrace();
    }
  }

Это дает ошибку:

java.lang.IllegalStateException: Value only available at runtime, but accessed from a non-runtime context: RuntimeValueProvider{propertyName=eligibilityInputTable, default=test.dqr_test_eligibilities}

Как я могу получить доступ к этому значению вне моего конвейера? Есть ли лучший способ выполнить работу «один раз» после завершения конвейера?

1 Ответ

0 голосов
/ 07 февраля 2019

Интерфейс ValueProvider позволяет конвейерам принимать параметры времени выполнения. Чтобы получить доступ к этим значениям для целей отчетности / ведения журнала, вам необходимо получить к ним доступ в DAG Beam. Возможное решение для этого - создать в вашем конвейере отчетную ветвь, которая принимает одно фиктивное значение, а внутри DoFn, который «обрабатывает» это фиктивное значение, экспортировать опции во внешнее хранилище.

Java (SDK 2.9.0):

public interface YourOptions extends PipelineOptions {
 @Description("Your option")
 @Default.String("Hello World!")
 ValueProvider<String> getStringValue();

 void setStringValue(ValueProvider<String>  value);
}

public static void main(String[] args) {

 // Create pipeline
 YourOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
     .as(YourOptions.class);

 Pipeline p = Pipeline.create(options);

 // Branch for pushing the Value Provider value

 p.apply(Create.of(1)).apply(ParDo.of(new DoFn<Integer, Integer>() {

   @ProcessElement public void process(ProcessContext c) {

     YourOptions ops = c.getPipelineOptions().as(YourOptions.class);
     // Do something like push to DB here....
     LOG.info("Option StringValue was {}" , ops.getStringValue());

   }
 }));

 // The main pipeline....
 p.apply(Create.of(1,2,3,4)).apply(Sum.integersGlobally());

 p.run();
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...