У меня есть давно выполняющиеся задачи (udf), которые мне нужно запустить в PySpark, некоторые из них могут работать часами, но я бы хотел добавить какую-то оболочку тайм-аута на случай, если они действительно работают слишком долго. Я просто хотел бы вернуть None
, если есть тайм-аут.
Я сделал что-то с signal
, но уверен, что это не самый безопасный способ сделать это.
import pyspark
import signal
import time
from pyspark import SQLContext
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from pyspark.sql.functions import udf
conf = pyspark.SparkConf()
sc = pyspark.SparkContext.getOrCreate(conf=conf)
spark = SQLContext(sc)
schema = StructType([
StructField("sleep", IntegerType(), True),
StructField("value", StringType(), True),
])
data = [[1, "a"], [2, "b"], [3, "c"], [4, "d"], [1, "e"], [2, "f"]]
df = spark.createDataFrame(data, schema=schema)
def handler(signum, frame):
raise TimeoutError()
def squared_typed(s):
def run_timeout():
signal.signal(signal.SIGALRM, handler)
signal.alarm(3)
time.sleep(s)
return s * s
try:
return run_timeout()
except TimeoutError as e:
return None
squared_udf = udf(squared_typed, IntegerType())
df.withColumn('sq', squared_udf('sleep')).show()
Это работает и дает ожидаемый результат, но есть ли способ сделать это более pysparkly способом?
+-----+-----+----+
|sleep|value| sq|
+-----+-----+----+
| 1| a| 1|
| 2| b| 4|
| 3| c|null|
| 4| d|null|
| 1| e| 1|
| 2| f| 4|
+-----+-----+----+
Спасибо