У меня есть такая функция:
def callAPI(row):
API_ENDPOINT = "https://xxxxxxxx.execute-api.us-east-1.amazonaws.com/dev/director"
row = json.loads(row)
r = requests.post(API_ENDPOINT, json.dumps(row))
return r.text
Я могу вызвать функцию синхронно для каждой строки и сохранить возвращенное значение в " столбец status:
seriesSanitize = f.udf(callAPI, StringType())
episodeDataF = episodeDataF.withColumn("status", seriesSanitize(f.col("episode")))
Теперь мне нужно сделать так, чтобы функция callAPI () была асинхронной , вместо ожидания ответа перед следующим вызовом , Для этого я переписал callAPI () асинхронным способом:
async def fetch(session, url, row):
async with session.post(url, json=row) as response:
return await response.text()
async def callAPI(row):
async with aiohttp.ClientSession() as session:
row = json.loads(row)
API_ENDPOINT = "https://xxxxxxxx.execute-api.us-east-1.amazonaws.com/dev/director"
status = await fetch(session, API_ENDPOINT, row)
return status
Поскольку теперь функция callAPI () имеет вид асинхронный , какие изменения мне нужно внести в следующий код, чтобы работа как положено:
seriesSanitize = f.udf(callAPI, StringType())
episodeDataF = episodeDataF.withColumn("status", seriesSanitize(f.col("episode")))