Мы запускаем pyspark в кластере EMR и имеем ~ 50 миллионов записей в кадре данных.Каждому требуется поле, добавленное к нему из API, который принимает 100 записей за один раз (итого ~ 500 тыс. Запросов).Мы можем их разделить и успешно выполнять вызовы API, однако иногда мы ограничиваем скорость.Когда это происходит, процесс продолжает отправлять запросы, и все они возвращают один и тот же ограниченный по скорости ответ.Поэтому, когда это происходит, мы хотим полностью остановить все запросы со всех подчиненных узлов и завершить работу.
Мы сократили размер кластера, чтобы избежать этой проблемы, но поскольку время отклика не всегда одинаково, нам нужен способ выйти и прекратить отправку запросов, если мы уже ограничены в скорости.
Мы используем mapPartitions()
на фрейме данных, и внутри него вызываем API.
Я ищу способ в функции, вызываемой mapPartitions()
, чтобы остановить все процессы на всех подчиненных узлах, поэтому, когда мы впервые замечаем, что мы ограничены в скорости, все вызовы API прекращаются.