У меня есть ситуация, когда мне нужно удалить большое количество файлов (сотни миллионов) из S3, и это займет, например, навсегда, если вы используете традиционные подходы (даже используя пакет python boto3
с delete_objects
, чтобы удалить их порциями по 1000 и обработать их локально в 16 многопроцессорных системах)
Итак, я разработал подход с использованием PySpark, где я:
- получаю список файлы, которые мне нужно удалить
- распараллелить его в кадре данных, разбить его по префиксу (учитывая, что у меня есть ограничение в 3500 запросов DELETE / se c на префикс)
- получить базовый RDD и примените
delete_objects
, используя метод .mapPartitions()
СДР - , преобразуйте его в кадр данных снова (
.toDF()
) - run
.cache()
и .count()
, чтобы принудительно выполнить запросы
Эта функция, которую я передаю .mapPartitions()
:
def delete_files(list_of_rows):
for chunk in chunked_iterable(list_of_rows, 1000):
session = boto3.session.Session(region_name='us-east-1')
client = session.client('s3')
files = list(chunk)
bucket = files[0][0]
delete = {'Objects': [{'Key': f[1]} for f in files]}
response = client.delete_objects(
Bucket=bucket,
Delete=delete,
)
yield Row(
deleted=len(response.get('Deleted'))
)
Работает хорошо, за исключением того, что в зависимости от количества файлов я получаю SlowDown
(код состояния 503) исключения, выходящие за пределы 350 0 DELETE запросов / se c для каждого префикса.
Это не имеет смысла для меня, учитывая, что я делю свои строки по префиксу [.repartition("prefix")
] (что означает, что у меня не должно быть того же префикса в нескольких разделов) и сопоставление функции delete_files
с разделом одновременно.
В моей голове не возможно, чтобы я звонил delete_objects
для того же префикса в одно и то же время, и поэтому, Я не могу найти причину продолжать нарушать эти ограничения.
Есть ли что-то еще, что я должен рассмотреть?
Заранее спасибо!