Тяжелый Stateful UDF в писпарке - PullRequest
       41

Тяжелый Stateful UDF в писпарке

0 голосов
/ 03 декабря 2018

Мне нужно запустить действительно тяжелую функцию Python как UDF в Spark, и я хочу кэшировать некоторые данные внутри UDF.Случай похож на один упомянутый здесь

Я знаю, что это медленно и неправильно.Но существующая инфраструктура находится в искре, и я не хочу настраивать новую инфраструктуру и отдельно заниматься загрузкой / распараллеливанием / отказоустойчивостью данных для этого случая.

Вот так выглядит моя искровая программа:

from mymodule import my_function # here is my function
from pyspark.sql.types import *
from pyspark.sql.functions import udf
from pyspark.sql.session import SparkSession

spark = SparkSession.builder.getOrCreate()

schema = StructType().add("input", "string")
df = spark.read.format("json").schema(schema).load("s3://input_path")

udf1 = udf(my_function, StructType().add("output", "string"))
df.withColumn("result", udf1(df.input)).write.json("s3://output_path/")

my_function внутренне вызывает метод объекта с медленным конструктором.Поэтому я не хочу, чтобы объект инициализировался для каждой записи, и я пытаюсь его кэшировать:

from my_slow_class import SlowClass
from cachetools import cached

@cached(cache={}) 
def get_cached_object():
    # this call is really slow therefore I am trying 
    # to cache it with cachetools
    return SlowClass() 

def my_function(input):
    slow_object = get_cached_object()
    output = slow_object.call(input)
    return {'output': output}

mymodule и my_slow_class установлены как модули на каждой искровой машине.

Кажется, работает.Конструктор вызывается только несколько раз (только 10-20 раз для 100 тыс. Строк во входном фрейме данных).И это то, что я хочу.

Меня беспокоит многопоточность / многопроцессорность внутри исполнителей Spark, и если кэшированный экземпляр SlowObject используется несколькими параллельными вызовами my_function.

Можно ли положиться натот факт, что my_function вызывается по одному внутри процессов Python на рабочих узлах?Использует ли spark какую-либо многопроцессорность / многопоточность в процессе Python, который выполняет мой UDF?

1 Ответ

0 голосов
/ 03 декабря 2018

Spark разветвляет процесс Python для создания отдельных рабочих, однако вся обработка в отдельном рабочем процессе является последовательной, если только многопоточность или многопроцессорная обработка не используются явно UserDefinedFunction.

Так что, пока состояние используется длякэширование и slow_object.call - это чистая функция, вам не о чем беспокоиться.

...