Я пытаюсь сделать запаздывающее обновление после того, как мой конвейер завершит, где таблицы передаются во время выполнения из-за версии версий. Поскольку этот код выполняется как шаблон, он требует использования nestedValueProviders.
public interface DataQueryRunnerOptions extends DataflowPipelineOptions {
@Description("Table to read/write payload data.")
@Default.String("test.payloadData")
ValueProvider<String> getPayloadTable();
@Description("Table to read eligibility data from, and update with payloadData")
@Default.String("test.dqr_test_eligibilities")
ValueProvider<String> getEligibilityInputTable();
}
Использование в трубопроводе:
campaignIdToDataQueryMap.apply("RunDataQueries", ParDo.of(new RunDataQueries()))
.apply("WritePayloadDataToTable", BigQueryIO.writeTableRows()
.withSchema(getPayloadDataSchema())
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
.to(options.getPayloadTable()));
Затем я вызываю код после конвейера:
pipeline.run().waitUntilFinish();
runFinalUpdate(options);
Для метода runFinalUpdate:
private static void runFinalUpdate(DataQueryRunner2Options options) {
ValueProvider.NestedValueProvider eligTable = ValueProvider.NestedValueProvider.of(
options.getEligibilityInputTable(),
(SerializableFunction<String, String>) eligibilityInputTable -> options.getEligibilityInputTable().get()
);
ValueProvider.NestedValueProvider payloadTable = ValueProvider.NestedValueProvider.of(
options.getPayloadTable(),
(SerializableFunction<String, String>) payload -> options.getPayloadTable().get()
);
String finalUpdate = "UPDATE " + eligTable.get() + " elig SET elig.dataQueryPayload = (SELECT pd.dataQueryPayload FROM `"
+ payloadTable.get() + "` pd WHERE pd.numericId = elig.numericId and pd.campaignId = elig.campaignId)"
+ " WHERE elig.dataQueryPayload IS NULL";
try {
Utilities.runQuery(finalUpdate);
} catch (InterruptedException e) {
LOG.error("Final update failure: " + e.getMessage());
e.printStackTrace();
}
}
Это дает ошибку:
java.lang.IllegalStateException: Value only available at runtime, but accessed from a non-runtime context: RuntimeValueProvider{propertyName=eligibilityInputTable, default=test.dqr_test_eligibilities}
Как я могу получить доступ к этому значению вне моего конвейера? Есть ли лучший способ выполнить работу «один раз» после завершения конвейера?