Как обрабатывать разделы pyspark dataframe асинхронно - PullRequest
0 голосов
/ 08 мая 2020

Я недавно начал играть с pyspark, так как мне приходится иметь дело с большими объемами данных. Задача, которая стояла передо мной в данный момент, - получить некоторую ценность, вызвав запущенную нами службу http. Мои данные хранятся в фрейме данных pyspark и выглядят так:

+--------------------+----------+
|                name|        id|
+--------------------+----------+
|xxxxxxxxxxxxxxxxx...|         1|
|yyyyyyyyyyyyyyyyy...|         2|
+--------------------+----------+

Я использую udf для вызова этой службы в столбце id, получаю новое значение (new_id) и добавляю его в существующий фрейм данных. Сетевая задержка не является проблемой, поскольку все устройства находятся в одном vpn. Мой код выглядит следующим образом:

def update_df(df):
    udf_get_new_id = udf(get_new_id, ArrayType(StringType()))

    df = df.withColumn('new_id', udf_get_new_id(df.id)[0])
    # My request might fail so I want to log the status code of the request as well
    updated_df = df.withColumn('status_code', udf_get_new_id(df.id)[1])
    return updated_df

def get_new_id(id)
    url = SOME_HTTP_URL
    headers = {'content-type': 'application/json',
               'Connection': 'keep-alive',
               'Content - Length': '125',
               'cache - control': 'no - cache',
               'keep-alive': 'timeout = 5, max = 1000'
               }
    body = {'id': id}
    response_string = requests.post(url, data = json.dumps(body), headers = headers)
    status_code = response_string.status_code
    response_content = ast.literal_eval(response_string.content.decode())
    new_id = response_content['new_id']
    return [new_id, status_code]

Я вызываю эти функции следующим образом:

updated_df = update_df(df)

start_time = timer()
updated_df.repartition(100).collect()
end_time = timer()
print(end_time - start_time)

Обновленные данные должны выглядеть так:

+--------------------+----------+----------+-----------+
|                name|        id|    new_id|status_code|
+--------------------+----------+----------+-----------+
|xxxxxxxxxxxxxxxxx...|         1|        XX|        200|
|yyyyyyyyyyyyyyyyy...|         2|        YY|        200|
+--------------------+----------+----------+-----------+

Служба, которую я вызываю в этом случае, может обрабатывать миллионы запросов в секунду, и я хочу использовать ее как можно чаще, и мне интересно, есть ли способы оптимизировать этот текущий код. Если вам интересно, вот максимальная вычислительная мощность, которую мне удалось получить для этого (команда spark-submit, которую я использую для запуска кода):

/opt/spark-2.4.4-bin-without-hadoop/bin/spark-submit --conf spark.app.name=test_app --master yarn --deploy-mode client --num-executors 4 --executor-cores 4 --executor-memory 2G --driver-cores 4 --driver-memory 2G

Я слышал от моих друзей что они собирались обрабатывать разделы асинхронно в scala и задавались вопросом, лучшее ли это, что я мог бы сделать с фреймом данных pyspark, или есть способ оптимизировать это? Приветствуются любые отзывы.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...