В записной книжке conda_python3 AWS SageMaker я определил следующую функцию, которая превращает содержимое объекта S3 в фрейм данных:
import io
import pandas as pd
def readS3Csv(corpus):
df = pd.read_csv(io.BytesIO(corpus['Body'].read()))
print(str(corpus) + ' read')
return(df)
Я протестировал его с помощью:
corpus1 = s3.get_object(Bucket='XXXX', Key='ZZZZ')
x = readS3Csv(corpus1)
И до сих пор это работало хорошо.Затем я попытался паралеллизировать преобразование содержимого в DataFrame для содержимого в различных объектах S3:
corpus1 = s3.get_object(Bucket='XXX', Key='QQQ')
corpus2 = s3.get_object(Bucket='XXX', Key='EEE')
corpus3 = s3.get_object(Bucket='XXX', Key='KKK')
corpus4 = s3.get_object(Bucket='XXX', Key='ZZZ')
Я использовал многопроцессорную библиотеку как:
corpus = [corpus1,corpus2,corpus3,corpus4,corpus5,corpus6]
pool = multiprocessing.Pool(processes = 6)
dfs = pool.map(readS3Cvs, corpus)
Я получил эту ошибку:
TypeError: не может сериализовать объект '_io.BufferedReader'
Затем я попытался:
with multiprocessing.Pool() as p:
print(p.map(readS3Csv, corpus))
И я все еще получил ту же ошибку.
Затем я попытался включить объект s3.get в определенную функцию следующим образом:
import io
import pandas as pd
import boto3
def readS3Csv(key):
s3 = boto3.client(
's3',
aws_access_key_id='HHH',
aws_secret_access_key='ZZZ'
)
corpus = s3.get_object(Bucket='XXX', Key=key)
df = pd.read_csv(io.BytesIO(corpus['Body'].read()))
print(str(key) + ' read')
return(df)
И когда я запустил:
keys = ['ttt','uuu','rrr','iii']
dfs = readS3Csv(keys[0])
, я получил кадр данных без ошибок.Но когда я делаю
keys = ['ttt','uuu','rrr','iii']
dfs = pool.map(readS3Csv,keys)
Но я все равно получаю сообщение об ошибке:
Причина: 'ошибка (для формата' 'i' требуется -2147483648 <= число <= 2147483647 ",) '</p>