У меня есть python конвейер потока данных, который получает уведомления от pubsub, считывает файлы из хранилища, преобразует их и затем загружает их в BigQuery.
Мне нужно выполнить обратную засыпку, которая значительно увеличивает объем за счет трубопровод, и это увеличивает количество работников, чтобы справиться с этим, как и ожидалось. Впоследствии, когда громкость достаточно мала для одного работника, она не масштабируется автоматически. Я вижу, что получаю много ошибок о длительных шагах, как показано ниже:
Error message from worker: Operation ongoing in step s03 for at least 04h30m00s without
outputting or completing in state finish at sun.misc.Unsafe.park(Native Method) at
java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at
java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693) at
java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) at
java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729) at
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) at
org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:57) at
org.apache.beam.runners.dataflow.worker.fn.control.RegisterAndProcessBundleOperation.finish(
RegisterAndProcessBundleOperation.java:332) at
org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:85) at
org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor.execute(BeamFnMapTaskExecutor.java:125) at
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1350) at
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1100(StreamingDataflowWorker.java:152) at
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:1073) at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at
java.lang.Thread.run(Thread.java:748)
Есть ли способ остановить или прервать эти длинные задачи? Например, установить лимит времени, который должен завершить шаг?
Я полагаю, что именно это останавливает мой конвейер, так что это будет дорогостоящим, чтобы он оставался включенным все время, пока я sh.