Я недавно начал играть с pyspark, так как мне приходится иметь дело с большими объемами данных. Задача, которая стояла передо мной в данный момент, - получить некоторую ценность, вызвав запущенную нами службу http. Мои данные хранятся в фрейме данных pyspark и выглядят так:
+--------------------+----------+
| name| id|
+--------------------+----------+
|xxxxxxxxxxxxxxxxx...| 1|
|yyyyyyyyyyyyyyyyy...| 2|
+--------------------+----------+
Я использую udf для вызова этой службы в столбце id
, получаю новое значение (new_id
) и добавляю его в существующий фрейм данных. Сетевая задержка не является проблемой, поскольку все устройства находятся в одном vpn. Мой код выглядит следующим образом:
def update_df(df):
udf_get_new_id = udf(get_new_id, ArrayType(StringType()))
df = df.withColumn('new_id', udf_get_new_id(df.id)[0])
# My request might fail so I want to log the status code of the request as well
updated_df = df.withColumn('status_code', udf_get_new_id(df.id)[1])
return updated_df
def get_new_id(id)
url = SOME_HTTP_URL
headers = {'content-type': 'application/json',
'Connection': 'keep-alive',
'Content - Length': '125',
'cache - control': 'no - cache',
'keep-alive': 'timeout = 5, max = 1000'
}
body = {'id': id}
response_string = requests.post(url, data = json.dumps(body), headers = headers)
status_code = response_string.status_code
response_content = ast.literal_eval(response_string.content.decode())
new_id = response_content['new_id']
return [new_id, status_code]
Я вызываю эти функции следующим образом:
updated_df = update_df(df)
start_time = timer()
updated_df.repartition(100).collect()
end_time = timer()
print(end_time - start_time)
Обновленные данные должны выглядеть так:
+--------------------+----------+----------+-----------+
| name| id| new_id|status_code|
+--------------------+----------+----------+-----------+
|xxxxxxxxxxxxxxxxx...| 1| XX| 200|
|yyyyyyyyyyyyyyyyy...| 2| YY| 200|
+--------------------+----------+----------+-----------+
Служба, которую я вызываю в этом случае, может обрабатывать миллионы запросов в секунду, и я хочу использовать ее как можно чаще, и мне интересно, есть ли способы оптимизировать этот текущий код. Если вам интересно, вот максимальная вычислительная мощность, которую мне удалось получить для этого (команда spark-submit
, которую я использую для запуска кода):
/opt/spark-2.4.4-bin-without-hadoop/bin/spark-submit --conf spark.app.name=test_app --master yarn --deploy-mode client --num-executors 4 --executor-cores 4 --executor-memory 2G --driver-cores 4 --driver-memory 2G
Я слышал от моих друзей что они собирались обрабатывать разделы асинхронно в scala и задавались вопросом, лучшее ли это, что я мог бы сделать с фреймом данных pyspark, или есть способ оптимизировать это? Приветствуются любые отзывы.