У меня следующая ошибка:
File "script_2019-06-02-23-49-11.py", line 478, in <module>
datarobot1Df = datarobot1Df.rdd.mapPartitions(partitionMapper).toDF()
File "/mnt/yarn/usercache/root/appcache/application_1559519164808_0001/container_1559519164808_0001_01_000001/pyspark.zip/pyspark/sql/session.py", line 58, in toDF
File "/mnt/yarn/usercache/root/appcache/application_1559519164808_0001/container_1559519164808_0001_01_000001/pyspark.zip/pyspark/sql/session.py", line 582, in createDataFrame
File "/mnt/yarn/usercache/root/appcache/application_1559519164808_0001/container_1559519164808_0001_01_000001/pyspark.zip/pyspark/sql/session.py", line 380, in _createFromRDD
File "/mnt/yarn/usercache/root/appcache/application_1559519164808_0001/container_1559519164808_0001_01_000001/pyspark.zip/pyspark/sql/session.py", line 351, in _inferSchema
File "/mnt/yarn/usercache/root/appcache/application_1559519164808_0001/container_1559519164808_0001_01_000001/pyspark.zip/pyspark/rdd.py", line 1361, in first
File "/mnt/yarn/usercache/root/appcache/application_1559519164808_0001/container_1559519164808_0001_01_000001/pyspark.zip/pyspark/rdd.py", line 1343, in take
File "/mnt/yarn/usercache/root/appcache/application_1559519164808_0001/container_1559519164808_0001_01_000001/pyspark.zip/pyspark/context.py", line 992, in runJob
File "/mnt/yarn/usercache/root/appcache/application_1559519164808_0001/container_1559519164808_0001_01_000001/pyspark.zip/pyspark/rdd.py", line 2455, in _jrdd
File "/mnt/yarn/usercache/root/appcache/application_1559519164808_0001/container_1559519164808_0001_01_000001/pyspark.zip/pyspark/rdd.py", line 2388, in _wrap_function
File "/mnt/yarn/usercache/root/appcache/application_1559519164808_0001/container_1559519164808_0001_01_000001/pyspark.zip/pyspark/rdd.py", line 2374, in _prepare_for_python_RDD
File "/mnt/yarn/usercache/root/appcache/application_1559519164808_0001/container_1559519164808_0001_01_000001/pyspark.zip/pyspark/serializers.py", line 464, in dumps
File "/mnt/yarn/usercache/root/appcache/application_1559519164808_0001/container_1559519164808_0001_01_000001/pyspark.zip/pyspark/cloudpickle.py", line 704, in dumps
File "/mnt/yarn/usercache/root/appcache/application_1559519164808_0001/container_1559519164808_0001_01_000001/pyspark.zip/pyspark/cloudpickle.py", line 162, in dump
pickle.PicklingError: Could not serialize object: TypeError: can't pickle thread.lock objects
Что здесь не так?Изит, потому что я пытаюсь использовать потоки для выполнения некоторых частей моего кода для вызова API?
logs = boto3.client('logs', region_name="ap-southeast-1")
NUMBER_THREADS = 6
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
streamId = datetime.now().strftime('%Y%m%d-%H%M%S')
logs.create_log_stream(
logGroupName='...',
logStreamName=streamId
)
prevLogSeqToken = None
def log(msg):
print(msg)
global prevLogSeqToken
if prevLogSeqToken != None:
resp = logs.put_log_events(
logGroupName="...",
logStreamName=streamId,
logEvents=[
{
'timestamp': int(time.time() * 1000),
'message': msg
}
],
sequenceToken=prevLogSeqToken
)
else:
resp = logs.put_log_events(
logGroupName="pinfare-glue",
logStreamName=streamId,
logEvents=[
{
'timestamp': int(time.time() * 1000),
'message': msg
}
],
)
prevLogSeqToken = resp["nextSequenceToken"]
log('Reading files ...')
inputGdf = glueContext.create_dynamic_frame_from_options(connection_type = "s3", connection_options = {"paths": ["s3://..."]}, format = "parquet")
datarobot1Df = inputGdf.toDF()
datarobot1Df = datarobot1Df.limit(1000).repartition(6)
numRows = datarobot1Df.count()
log('Counted rows %d ...' % numRows)
datarobot1Df.show(10)
# print("Number of rows: %d (%s)" % (numRows, datetime.now().strftime("%Y-%m-%d %H:%M:%S")))
if numRows > 0 :
# Given price, output groups eg. (-Inf, 200], (400, 600]
def getPriceInterval(price):
if (price <= 200):
return '(-Inf,200]'
if (price <= 400):
return '(200,400]'
if (price <= 600):
return '(400,600]'
if (price <= 800):
return '(600,800]'
else:
return '(800,Inf]'
spark.udf.register("getPriceInterval", getPriceInterval, DecimalType())
getPriceIntervalUdf = udf(getPriceInterval)
BATCH_SIZE = 3600
def callDatarobot(arr, data):
print("Calling datarobot %s" % time.ctime())
log("Calling datarobot ...")
# call mod A
dataJsonStr = json.dumps(data)
res = requests.post("http://...", data = dataJsonStr, headers = { 'Content-Type': 'application/json' })
batchResults = res.json()
batchResults = list(map(lambda o: o['prediction'], batchResults['data']))
for i, o in enumerate(batchResults):
data[i]['PredictedFare'] = o
# call mod B
dataJsonStr = json.dumps(data)
res = requests.post("http://...", data = dataJsonStr, headers = { 'Content-Type': 'application/json' })
batchResults = res.json()
batchResults = list(map(lambda o: o['prediction'], batchResults['data']))
for i, o in enumerate(batchResults):
data[i] = {
'PredictionSurge': o,
'price_chg_inc_gt10_a7': data[i]['price_chg_inc_gt10_a7']
}
arr.append(data)
def worker(q, arr):
while True:
jsonStr = q.get()
if jsonStr is None:
break
callDatarobot(arr, jsonStr)
q.task_done()
def partitionMapper(partition):
q = Queue()
arr = Manager().list()
threads = []
print("Partition mapper start %s" % time.ctime())
log("Partition mapper start %s" % time.ctime())
# init worker threads
for i in range(NUMBER_THREADS):
t = Thread(target=worker, args=(q, arr,))
t.start()
threads.append(t)
data = []
for row in partition:
data.append(row.asDict())
if (len(data) == BATCH_SIZE):
q.put(json.dumps(data))
data = []
if (len(data) > 0):
q.put(json.dumps(data))
data = []
# Wait till all jobs done
q.join()
# Stop workers
for i in range(NUMBER_THREADS):
q.put(None)
for t in threads:
t.join()
print("Partition mapper end %s" % time.ctime())
log("Partition mapper end")
return [val for sublist in arr for val in sublist]
# print("Number of partitions %d (%s)" % (datarobot1Df.rdd.getNumPartitions(), datetime.now().strftime("%Y-%m-%d %H:%M:%S")))
datarobot1Df = datarobot1Df.withColumn('PriceCategory', getPriceIntervalUdf(datarobot1Df['Price']))
datarobot1Df = datarobot1Df.rdd.mapPartitions(partitionMapper).toDF()
log("Writing intermediate results")
datarobot1Df.write \
.mode('overwrite') \
.parquet('s3://...')
Также я могу проверить, будут ли API вызываться параллельно даже без потоков на каждом исполнителе?Когда я использую эту технику ранее без потоков, кажется, что только 1 исполнитель вызывает API.Это почему?