Я использую Scio v.0.8.0 (то есть больше нет фьючерсов). Мне нужно извлечь некоторые записи из BigQuery и сохранить их обратно в BQ. После успешного сохранения мне нужно опубликовать sh пропущенных записей в pubsub после определенного числа попыток.
val (sc, scioArgs) = ContextAndArgs(args)
sc.bigQuerySelect(Query(reconciledQueryString))
.map(****)
.saveAsTypedBigQueryTable(Table.Spec(bigQueryConfig.getBigQueryPath("***")
val res = sc.run().waitUntilDone()
if(res.state == PipelineResult.State.DONE) {
val (sc2, scioArgs2) = ContextAndArgs(args)
sc.bigQuerySelect(Query(recoveryQueryString))
.map(****)
.saveAsCustomOutput("PubsubOut", pubsubOut(""****"")
sc2.run().waitUntilDone()
}
Второе задание зависит от первого. Если я запускаю обе задачи в одном и том же контексте один за другим, я получаю «Конвейер не может быть изменен после выполнения ScioContext», если я создаю второй контекст после завершения первого, тогда задание фактически отправляет два задания (что, вероятно, нормально ). Есть ли способ организовать две задачи в одном контексте и как-то добавить зависимость между ними?