Мне нужно запустить действительно тяжелую функцию Python как UDF в Spark, и я хочу кэшировать некоторые данные внутри UDF.Случай похож на один упомянутый здесь
Я знаю, что это медленно и неправильно.Но существующая инфраструктура находится в искре, и я не хочу настраивать новую инфраструктуру и отдельно заниматься загрузкой / распараллеливанием / отказоустойчивостью данных для этого случая.
Вот так выглядит моя искровая программа:
from mymodule import my_function # here is my function
from pyspark.sql.types import *
from pyspark.sql.functions import udf
from pyspark.sql.session import SparkSession
spark = SparkSession.builder.getOrCreate()
schema = StructType().add("input", "string")
df = spark.read.format("json").schema(schema).load("s3://input_path")
udf1 = udf(my_function, StructType().add("output", "string"))
df.withColumn("result", udf1(df.input)).write.json("s3://output_path/")
my_function
внутренне вызывает метод объекта с медленным конструктором.Поэтому я не хочу, чтобы объект инициализировался для каждой записи, и я пытаюсь его кэшировать:
from my_slow_class import SlowClass
from cachetools import cached
@cached(cache={})
def get_cached_object():
# this call is really slow therefore I am trying
# to cache it with cachetools
return SlowClass()
def my_function(input):
slow_object = get_cached_object()
output = slow_object.call(input)
return {'output': output}
mymodule
и my_slow_class
установлены как модули на каждой искровой машине.
Кажется, работает.Конструктор вызывается только несколько раз (только 10-20 раз для 100 тыс. Строк во входном фрейме данных).И это то, что я хочу.
Меня беспокоит многопоточность / многопроцессорность внутри исполнителей Spark, и если кэшированный экземпляр SlowObject
используется несколькими параллельными вызовами my_function
.
Можно ли положиться натот факт, что my_function
вызывается по одному внутри процессов Python на рабочих узлах?Использует ли spark какую-либо многопроцессорность / многопоточность в процессе Python, который выполняет мой UDF?