Вызвать конечную точку Sagemaker с помощью Spark (EMR Cluster) - PullRequest
0 голосов
/ 26 марта 2020

Я разрабатываю искровое приложение в кластере EMR. Ход проекта выглядит следующим образом:

Датафрейм перераспределяется на основе идентификатора.

Конечная точка Sagemaker должна вызываться в каждом разделе и получать результат.

Но при этом я получаю эту ошибку:

cPickle.PicklingError: Could not serialize object: TypeError: can't pickle thread.lock objects

Код выглядит следующим образом:


from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark import SparkConf
import itertools
import json
import boto3
import time
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
from pyspark.sql import functions as F
from pyspark.sql.functions import lit
from io import BytesIO as StringIO


client=boto3.client('sagemaker-runtime')

def invoke_endpoint(json_data):
    ansJson=json.dumps(json_data)
    response=client.invoke_endpoint(EndpointName="<EndpointName>",Body=ansJson,ContentType='text/csv',Accept='Accept')
    resultJson=json.loads(str(response['Body'].read().decode('ascii')))
    return resultJson

def execute(list_of_url):
    final_iterator=[]
    urlist=[]
    json_data={}
    for url in list_of_url:
        final_iterator.append((url.ID,url.Prediction))
        urlist.append(url.ID)
    json_data['URL']=urlist
    ressultjson=invoke_endpoint(json_data)
    return iter(final_iterator)

### Atributes to be added to Spark Conf
conf = (SparkConf().set("spark.executor.extraJavaOptions","-Dcom.amazonaws.services.s3.enableV4=true").set("spark.driver.extraJavaOptions","-Dcom.amazonaws.services.s3.enableV4=true"))


scT=SparkContext(conf=conf)
scT.setSystemProperty("com.amazonaws.services.s3.enableV4","true")

hadoopConf=scT._jsc.hadoopConfiguration()
hadoopConf.set("f3.s3a.awsAccessKeyId","<AccessKeyId>")
hadoopConf.set("f3.s3a.awsSecretAccessKeyId","<SecretAccessKeyId>")
hadoopConf.set("f3.s3a.endpoint","s3-us-east-1.amazonaws.com")
hadoopConf.set("com.amazonaws.services.s3.enableV4","true")
hadoopConf.set("fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem")


sql=SparkSession(scT)
csv_df=sql.read.csv('s3 path to my csv file',header =True)

#print('Total count is',csv_df.count())

csv_dup_df=csv_df.dropDuplicates(['ID'])
print('Total count is',csv_dup_df.count())

windowSpec=Window.orderBy("ID")

result_df=csv_dup_df.withColumn("ImageID",F.row_number().over(windowSpec)%80)

final_df=result_df.withColumn("Prediction",lit(str("UNKOWN")))


df2 = final_df.repartition("ImageID")


df3=df2.rdd.mapPartitions(lambda url: execute(url)).toDF()
df3.coalesce(1).write.mode("overwrite").save("s3 path to save the results in csv format",format="csv")

print(df3.rdd.glom().collect())
##Ok
print("Work is Done")

Можете ли вы сказать мне, как исправить эту проблему?

...