Нужно оптимизировать звонок в службу отдыха в спарк - PullRequest
0 голосов
/ 03 октября 2019

Мне нужно вызвать службу отдыха для каждого ряда набора данных в искре. Я создал следующий код:

import requests

df= spark.read.parquet("file.parquet")

for row in df.rdd.collect():
  requests.post('rest.api/endpoint')

Я не уверен, что это лучший способ сделать это с точки зрения производительности. Есть ли лучший способ добиться этого?

1 Ответ

0 голосов
/ 03 октября 2019

Запустив его на .collect результатах, вы потеряете парализатор, и все запросы будут выполнены драйвером.

Вы можете создать UDF, который будет вызывать API для каждой из строк:

from pyspark.sql.functions import udf
import requests

api = "https://swapi.co/api/people/"

@udf("string")
def swapiGetPersonName(id):
    response = requests.get(api + str(id))
    return response.json()["name"]

df = spark.range(1,10)
df.select("id", swapiGetPersonName("id").alias("name")).show()

Однако, если у вас много данных, это может легко перегрузить как службу отдыха, так и вашего исполнителя. ,(вы в значительной степени будете выполнять атаку типа «отказ в обслуживании» или не использовать сокеты). Если это проблема, вы можете либо

  • пакетных данных, загрузив их подмножество за время
  • пакетных данных, разделив их с помощью foreachPartition и обрабатывая строки по однойв каждом
  • потоковых данных с использованием структурированной потоковой передачи и ограничения количества строк, которые вы обрабатываете за раз
  • используйте API (или измените свой), который поддерживает пакетные операции (вместо каждой строки загружайте целый раздел/ значительная часть ваших данных)

Это из головы, но, помимо слишком большого количества обращений в службу, не забудьте добавить правильную обработку исключений :)

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...