Возвращать None по таймауту с PySpark UDF - PullRequest
0 голосов
/ 05 мая 2020

У меня есть давно выполняющиеся задачи (udf), которые мне нужно запустить в PySpark, некоторые из них могут работать часами, но я бы хотел добавить какую-то оболочку тайм-аута на случай, если они действительно работают слишком долго. Я просто хотел бы вернуть None, если есть тайм-аут.

Я сделал что-то с signal, но уверен, что это не самый безопасный способ сделать это.

import pyspark
import signal
import time

from pyspark import SQLContext
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from pyspark.sql.functions import udf

conf = pyspark.SparkConf() 
sc = pyspark.SparkContext.getOrCreate(conf=conf)
spark = SQLContext(sc)


schema = StructType([
    StructField("sleep", IntegerType(), True),    
    StructField("value", StringType(), True),
])

data = [[1, "a"], [2, "b"], [3, "c"], [4, "d"], [1, "e"], [2, "f"]]

df = spark.createDataFrame(data, schema=schema)

def handler(signum, frame):
    raise TimeoutError()

def squared_typed(s):
    def run_timeout():
        signal.signal(signal.SIGALRM, handler)
        signal.alarm(3)

        time.sleep(s)

        return s * s

    try:
        return run_timeout()
    except TimeoutError as e:
        return None

squared_udf = udf(squared_typed, IntegerType())

df.withColumn('sq', squared_udf('sleep')).show()

Это работает и дает ожидаемый результат, но есть ли способ сделать это более pysparkly способом?

+-----+-----+----+
|sleep|value|  sq|
+-----+-----+----+
|    1|    a|   1|
|    2|    b|   4|
|    3|    c|null|
|    4|    d|null|
|    1|    e|   1|
|    2|    f|   4|
+-----+-----+----+

Спасибо

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...