Мультипроцессорное преобразование тела контента S3 в панд DataFrame AWS Sagemaker - PullRequest
0 голосов
/ 21 мая 2019

В записной книжке conda_python3 AWS SageMaker я определил следующую функцию, которая превращает содержимое объекта S3 в фрейм данных:

import io
import pandas as pd
def readS3Csv(corpus):
    df = pd.read_csv(io.BytesIO(corpus['Body'].read()))
    print(str(corpus) + ' read')
    return(df)

Я протестировал его с помощью:

corpus1 = s3.get_object(Bucket='XXXX', Key='ZZZZ')
x = readS3Csv(corpus1)

И до сих пор это работало хорошо.Затем я попытался паралеллизировать преобразование содержимого в DataFrame для содержимого в различных объектах S3:

corpus1 = s3.get_object(Bucket='XXX', Key='QQQ')
corpus2 = s3.get_object(Bucket='XXX', Key='EEE')
corpus3 = s3.get_object(Bucket='XXX', Key='KKK')
corpus4 = s3.get_object(Bucket='XXX', Key='ZZZ')

Я использовал многопроцессорную библиотеку как:

corpus = [corpus1,corpus2,corpus3,corpus4,corpus5,corpus6]
pool = multiprocessing.Pool(processes = 6)

dfs = pool.map(readS3Cvs, corpus)

Я получил эту ошибку:

TypeError: не может сериализовать объект '_io.BufferedReader'

Затем я попытался:

with multiprocessing.Pool() as p:
    print(p.map(readS3Csv, corpus))

И я все еще получил ту же ошибку.

Затем я попытался включить объект s3.get в определенную функцию следующим образом:

import io
import pandas as pd
import boto3

def readS3Csv(key):
    s3 = boto3.client(
            's3',
            aws_access_key_id='HHH',
            aws_secret_access_key='ZZZ'
        )
    corpus = s3.get_object(Bucket='XXX', Key=key)
    df = pd.read_csv(io.BytesIO(corpus['Body'].read()))
    print(str(key) + ' read')
    return(df)

И когда я запустил:

keys = ['ttt','uuu','rrr','iii']
dfs = readS3Csv(keys[0])

, я получил кадр данных без ошибок.Но когда я делаю

keys = ['ttt','uuu','rrr','iii']
dfs = pool.map(readS3Csv,keys)

Но я все равно получаю сообщение об ошибке:

Причина: 'ошибка (для формата' 'i' требуется -2147483648 <= число <= 2147483647 ",) '</p>

1 Ответ

1 голос
/ 23 мая 2019

s3.get_object возвращает dict, содержащий StreamingBody, который не может быть сериализован, так как является не более чем оболочкой для потокового считывателя.Чтобы многопроцессорная обработка работала, параметры рабочих процессов должны быть сериализуемыми.

Поскольку ваше имя сегмента является константой, вы должны передать имя ключа readS3Csv () и выполнить s3.get_object внутри этого метода, а не передавать в корпус.

...