У меня есть фрейм данных, где каждая строка содержит префикс, указывающий на местоположение в S3. Я хочу использовать flatMap () для итерации по каждой строке, перечислить объекты S3 в каждом префиксе и вернуть новый фрейм данных, содержащий строку для файла, который был указан в S3.
У меня есть этот код:
import boto3
s3 = boto3.resource('s3')
def flatmap_list_s3_files(row):
bucket = s3.Bucket(row.bucket)
s3_files = []
for obj in bucket.objects.filter(Prefix=row.prefix):
s3_files.append(obj.key)
rows = []
for f in s3_files:
row_dict = row.asDict()
row_dict['s3_obj'] = f
rows.append(Row(**row_dict))
return rows
df = <code that loads the dataframe>
df.rdd.flatMap(lambda x: flatmap_list_s3_files(x))).toDF()
Единственная проблема в том, что объект s3
не является маринованным? Таким образом, я получаю эту ошибку, и я не уверен, что попробовать дальше:
PicklingError: Cannot pickle files that are not opened for reading
Я искушенный новичок, поэтому я надеюсь, что есть какой-то другой API или какой-нибудь способ распараллелить распечатку файлов в S3 и объединить их с исходным фреймом данных. Чтобы было ясно, я не пытаюсь ПРОЧИТАТЬ какие-либо данные в самих файлах S3, я строю таблицу, которая по сути является каталогом метаданных всех файлов в S3. Любые советы будут с благодарностью.