Как сделать асинхронный вызов функции, передав значение столбца каждой строки Dataframe в Pyspark? - PullRequest
0 голосов
/ 15 апреля 2020

У меня есть такая функция:

def callAPI(row): API_ENDPOINT = "https://xxxxxxxx.execute-api.us-east-1.amazonaws.com/dev/director" row = json.loads(row) r = requests.post(API_ENDPOINT, json.dumps(row)) return r.text

Я могу вызвать функцию синхронно для каждой строки и сохранить возвращенное значение в " столбец status:

seriesSanitize = f.udf(callAPI, StringType()) episodeDataF = episodeDataF.withColumn("status", seriesSanitize(f.col("episode")))

Теперь мне нужно сделать так, чтобы функция callAPI () была асинхронной , вместо ожидания ответа перед следующим вызовом , Для этого я переписал callAPI () асинхронным способом:

async def fetch(session, url, row):
async with session.post(url, json=row) as response:
    return await response.text()

async def callAPI(row):
async with aiohttp.ClientSession() as session:
    row = json.loads(row)
    API_ENDPOINT = "https://xxxxxxxx.execute-api.us-east-1.amazonaws.com/dev/director"

    status = await fetch(session, API_ENDPOINT, row)
    return status

Поскольку теперь функция callAPI () имеет вид асинхронный , какие изменения мне нужно внести в следующий код, чтобы работа как положено:

seriesSanitize = f.udf(callAPI, StringType()) episodeDataF = episodeDataF.withColumn("status", seriesSanitize(f.col("episode")))

...