Чтение файлов S3, которые встречают последнее измененное окно, в DataFrame - PullRequest
1 голос
/ 20 февраля 2020

У меня есть корзина S3 с объектами, где Last Modified варьируется от очень старого до текущего. Мне нужно быть в состоянии найти файлы с последним измененным штампом в окне, а затем прочитать эти файлы (которые JSON) в некоторый тип фрейма данных (pandas, spark, et c.).

Я попытался собрать файлы, прочитать их по отдельности и добавить с помощью следующего кода, но это очень медленно:

session = boto3.session.Session(region_name=region)

#Gather all keys that have a modified stamp between max_previous_data_extracted_timestamp and start_time_proper
s3 = session.resource('s3', region_name=region)
bucket = s3.Bucket(args.sourceBucket)
app_body = []
for obj in bucket.objects.all():
    obj_datetime = obj.last_modified.replace(tzinfo=None)
    if args.accountId + '/Patient' in obj.key and obj_datetime > max_previous_data_extracted_timestamp_datetime and obj_datetime <= start_time_datetime:
        obj_df = pd.read_csv(obj.get()['Body'])
        app_body.append(obj_df)

merged_dataframe = pd.concat(app_body)

Logi c работает только в том случае, если я получить объекты, которые были изменены в пределах окна, однако следующая часть, где оно получает тело и добавляет его в список, выполняется в течение 30-45 минут для файлов ~ 10K. Должен быть лучший способ сделать это, о котором я просто не думаю.

1 Ответ

1 голос
/ 20 февраля 2020

Spark - это способ go на этом.

При разговоре с корзиной S3 с большим количеством файлов, мы всегда должны помнить, что перечисление всех объектов в корзине стоит дорого, так как он возвращает 1000 объектов за раз и указатель для выборки следующего набора. Это делает очень трудным распараллеливание, если вы не знаете структуру и не используете ее для оптимизации этих вызовов.

Извините, если код не работает, я использую scala, но это должно быть почти в рабочее состояние.

Зная, что ваша структура: bucket/account_identifier/Patient/patient_identifier:

# account_identifiers -- provided from DB
accounts_df = sc.parallelize(account_identifiers, number_of_partitions)
paths = accounts_df.mapPartitions(fetch_files_for_account).collect()
df = spark.read.json(paths)


def fetch_files_for_account(accounts):
    s3 = boto3.client('s3')
    result = []
    for a in accounts:
        marker = ''
        while True:
            request_result = s3.list_objects(Bucket=args.sourceBucket, Prefix=a)
            items = request_result['Contents']
            for i in items:
                obj_datetime = i['LastModified'].replace(tzinfo=None)
                if obj_datetime > max_previous_data_extracted_timestamp_datetime and obj_datetime <= start_time_datetime:
                    result.append('s3://' + args.sourceBucket +'/' + i['Key'])
            if not request_result['IsTruncated']:
                break
            else:
                marker = request_result['Marker']
    return iter(result)

Разделы карты будут следить за тем, чтобы не было создано слишком много клиентов. Вы можете управлять этим номером, используя number_of_partitions.

Другая оптимизация, которую вы можете сделать, - это вручную загружать содержимое после вызова mapPartitions вместо использования collect(). После этого этапа у вас будет String с содержанием JSON, а затем вы позвоните spark.createDataFrame(records, schema). Примечание: вы должны предоставить схему.

Если у вас нет account_identifiers или количество файлов не попадет на территорию 100 тыс., Вам придется перечислить все объекты в корзине, отфильтровать по last_modified и в основном делают один и тот же звонок:

spark.read.json(paths)
...