Поток данных Google - как остановить длительные шаги - PullRequest
1 голос
/ 26 февраля 2020

У меня есть python конвейер потока данных, который получает уведомления от pubsub, считывает файлы из хранилища, преобразует их и затем загружает их в BigQuery.

Мне нужно выполнить обратную засыпку, которая значительно увеличивает объем за счет трубопровод, и это увеличивает количество работников, чтобы справиться с этим, как и ожидалось. Впоследствии, когда громкость достаточно мала для одного работника, она не масштабируется автоматически. Я вижу, что получаю много ошибок о длительных шагах, как показано ниже:

Error message from worker: Operation ongoing in step s03 for at least 04h30m00s without 
outputting or completing in state finish at sun.misc.Unsafe.park(Native Method) at 
java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at 
java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693) at 
java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) at 
java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729) at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) at 
org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:57) at 
org.apache.beam.runners.dataflow.worker.fn.control.RegisterAndProcessBundleOperation.finish(
RegisterAndProcessBundleOperation.java:332) at 
org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:85) at 
org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor.execute(BeamFnMapTaskExecutor.java:125) at 
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1350) at 
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1100(StreamingDataflowWorker.java:152) at 
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:1073) at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at 
java.lang.Thread.run(Thread.java:748)

Есть ли способ остановить или прервать эти длинные задачи? Например, установить лимит времени, который должен завершить шаг?

Я полагаю, что именно это останавливает мой конвейер, так что это будет дорогостоящим, чтобы он оставался включенным все время, пока я sh.

...