Запустив его на .collect
результатах, вы потеряете парализатор, и все запросы будут выполнены драйвером.
Вы можете создать UDF, который будет вызывать API для каждой из строк:
from pyspark.sql.functions import udf
import requests
api = "https://swapi.co/api/people/"
@udf("string")
def swapiGetPersonName(id):
response = requests.get(api + str(id))
return response.json()["name"]
df = spark.range(1,10)
df.select("id", swapiGetPersonName("id").alias("name")).show()
Однако, если у вас много данных, это может легко перегрузить как службу отдыха, так и вашего исполнителя. ,(вы в значительной степени будете выполнять атаку типа «отказ в обслуживании» или не использовать сокеты). Если это проблема, вы можете либо
- пакетных данных, загрузив их подмножество за время
- пакетных данных, разделив их с помощью
foreachPartition
и обрабатывая строки по однойв каждом - потоковых данных с использованием структурированной потоковой передачи и ограничения количества строк, которые вы обрабатываете за раз
- используйте API (или измените свой), который поддерживает пакетные операции (вместо каждой строки загружайте целый раздел/ значительная часть ваших данных)
Это из головы, но, помимо слишком большого количества обращений в службу, не забудьте добавить правильную обработку исключений :)