scio Pipeline не может быть изменен после выполнения ScioContext, как запускать две разные задачи / конвейеры одну за другой - PullRequest
0 голосов
/ 14 января 2020

Я использую Scio v.0.8.0 (то есть больше нет фьючерсов). Мне нужно извлечь некоторые записи из BigQuery и сохранить их обратно в BQ. После успешного сохранения мне нужно опубликовать sh пропущенных записей в pubsub после определенного числа попыток.

val (sc, scioArgs) = ContextAndArgs(args)
sc.bigQuerySelect(Query(reconciledQueryString))
    .map(****)
    .saveAsTypedBigQueryTable(Table.Spec(bigQueryConfig.getBigQueryPath("***")

val res = sc.run().waitUntilDone()

if(res.state == PipelineResult.State.DONE) {
    val (sc2, scioArgs2) = ContextAndArgs(args)

    sc.bigQuerySelect(Query(recoveryQueryString))
    .map(****)
    .saveAsCustomOutput("PubsubOut", pubsubOut(""****"")
        sc2.run().waitUntilDone()
}

Второе задание зависит от первого. Если я запускаю обе задачи в одном и том же контексте один за другим, я получаю «Конвейер не может быть изменен после выполнения ScioContext», если я создаю второй контекст после завершения первого, тогда задание фактически отправляет два задания (что, вероятно, нормально ). Есть ли способ организовать две задачи в одном контексте и как-то добавить зависимость между ними?

...