Первый ресурс, который вы должны проверить, это документация Dataflow.Было бы полезно проверить следующее:
Если эти ресурсы не помогают, я постараюсь обобщить некоторые причины, по которым ваша работа может зависнуть, и как вы можете ее отладить.Я разделю эти вопросы в зависимости от того, какая часть системы вызывает проблемы.Ваша работа может быть:
Задание застряло при запуске
Задание может зависнуть при получении службой потока данных или при запуске новых работников потока данных.Вот некоторые факторы риска:
- Вы добавили пользовательский файл
setup.py
? - Есть ли у вас зависимости, требующие специальной настройки при запуске работника?
- Управляете ли вы рабочим контейнером?
Для отладки такого рода проблемы Я обычно открываю ведение журнала StackDriver и ищу журналы worker-startup
(см. Следующий рисунок).Эти журналы записываются рабочим, когда он запускает докер-контейнер с вашим кодом и вашими зависимостями.Если вы обнаружите здесь какую-либо проблему, это будет означать проблему с вашим setup.py
, вашей заявкой на работу, постановочными артефактами и т. Д.
Другая вещьВы можете сохранить ту же настройку и запустить очень маленький конвейер, который все ставит:
with beam.Pipeline(...) as p:
(p
| beam.Create(['test element'])
| beam.Map(lambda x: logging.info(x)))
Если вы не видите свои журналы в StackDriver, вы можете продолжить отладку вашей настройки. Если вы видите журнал в StackDriver, то ваша работа может застрять в другом месте.
Задание кажется застрявшим в коде пользователя
Что-то еще, что может произойти, это то, что ваша работа выполняет некоторую операцию в коде пользователя, который застрял или медленно .Вот некоторые факторы риска:
- Выполняет ли ваша работа операции, требующие их ожидания?(например, загрузка данных во внешнюю службу, ожидание обещаний / фьючерсов)
- Обратите внимание, что некоторые встроенные преобразования Beam делают именно это (например, IO Beam, такие как BigQueryIO, FileIO и т. д.).
- Ваша работа загружает очень большие боковые входы в память?Это может произойти, если вы используете
View.AsList
для бокового ввода. - Загружает ли ваша работа очень большие итерации после
GroupByKey
операций?
Симптом такого рода проблема может заключаться в том, что пропускная способность конвейера ниже, чем вы ожидаете. Другим симптомом является следующая строка в журналах:
Processing stuck in step <STEP_NAME>/<...>/<...> for at least <TIME> without outputting or completing in state <STATE>
.... <a stacktrace> ....
В подобных случаях имеет смысл посмотреть, какой шаг занимает больше всего времени в вашем конвейере, и проверитькод для этого шага, чтобы увидеть, в чем может быть проблема.
Некоторые советы:
Оченьбольшие побочные входы могут быть проблематичными, поэтому, если ваш конвейер полагается на доступ к очень большим боковым входам, вам может потребоваться изменить его, чтобы избежать этого узкого места.
Возможно иметь асинхронные запросыдля внешних служб, но я рекомендую вам совершить / завершить работу для вызовов startBundle
и finishBundle
.
Если пропускная способность вашего конвейера не соответствует ожидаемой, это может бытьпотому что вам не хватает параллелизма.Это может быть исправлено с помощью Reshuffle
или путем разделения ваших существующих ключей на подразделы (Beam часто выполняет обработку для каждого ключа, и поэтому, если у вас слишком мало ключей, ваш параллелизм будет низким) - или вместо этого используйте Combiner
из GroupByKey
+ ParDo
.
Другой причиной низкой пропускной способности может быть то, что ваша работа слишком долго ожидает внешних вызовов.Вы можете попытаться решить эту проблему, испробовав стратегии пакетной обработки или асинхронный ввод-вывод.
В общем, нет серебряной пули для улучшения пропускной способности вашего конвейера, и вам нужно будет поэкспериментировать.
Свежесть данных или системная задержка увеличиваются
Прежде всего, я бы порекомендовал вам проверить эту презентацию по водяным знакам .
Для потоковой передачи продвижение водяных знаков - это то, что заставляет конвейер двигаться вперед, таким образом,важно внимательно следить за тем, что может привести к задержке водяного знака и остановить ваш трубопровод вниз по течению.Некоторые причины, по которым водяной знак может застрять:
- Одна из возможностей заключается в том, что ваш конвейер находится в состоянии неразрешимой ошибки. Если пакет не удается обработать, ваш конвейер будет продолжать пытаться выполнить этот пакет бесконечно , и это удержит водяной знак обратно.
- Когда это произойдет, вы увидите ошибки в вашей консоли Dataflow , и счет будет продолжать расти, когда пакет повторяется.См .:
- При привязке отметок времени к вашим данным может возникнуть ошибка.Убедитесь, что разрешение ваших меток времени является правильным!
- Хотя это маловероятно, но, возможно, вы столкнулись с ошибкой в потоке данных.Если ни один из других советов не поможет, откройте заявку в службу поддержки.