Spark - это способ go на этом.
При разговоре с корзиной S3 с большим количеством файлов, мы всегда должны помнить, что перечисление всех объектов в корзине стоит дорого, так как он возвращает 1000 объектов за раз и указатель для выборки следующего набора. Это делает очень трудным распараллеливание, если вы не знаете структуру и не используете ее для оптимизации этих вызовов.
Извините, если код не работает, я использую scala, но это должно быть почти в рабочее состояние.
Зная, что ваша структура: bucket/account_identifier/Patient/patient_identifier
:
# account_identifiers -- provided from DB
accounts_df = sc.parallelize(account_identifiers, number_of_partitions)
paths = accounts_df.mapPartitions(fetch_files_for_account).collect()
df = spark.read.json(paths)
def fetch_files_for_account(accounts):
s3 = boto3.client('s3')
result = []
for a in accounts:
marker = ''
while True:
request_result = s3.list_objects(Bucket=args.sourceBucket, Prefix=a)
items = request_result['Contents']
for i in items:
obj_datetime = i['LastModified'].replace(tzinfo=None)
if obj_datetime > max_previous_data_extracted_timestamp_datetime and obj_datetime <= start_time_datetime:
result.append('s3://' + args.sourceBucket +'/' + i['Key'])
if not request_result['IsTruncated']:
break
else:
marker = request_result['Marker']
return iter(result)
Разделы карты будут следить за тем, чтобы не было создано слишком много клиентов. Вы можете управлять этим номером, используя number_of_partitions
.
Другая оптимизация, которую вы можете сделать, - это вручную загружать содержимое после вызова mapPartitions
вместо использования collect()
. После этого этапа у вас будет String
с содержанием JSON, а затем вы позвоните spark.createDataFrame(records, schema)
. Примечание: вы должны предоставить схему.
Если у вас нет account_identifiers
или количество файлов не попадет на территорию 100 тыс., Вам придется перечислить все объекты в корзине, отфильтровать по last_modified
и в основном делают один и тот же звонок:
spark.read.json(paths)