Исключение поднять исключение, когда условие - PullRequest
0 голосов
/ 28 апреля 2020

Вопрос

Как вы знаете, у Spark есть проблема с приведением типов: если тип не может быть приведен, Spark не вызывает исключение, а вместо этого устанавливает ноль. В свою очередь, я хочу выполнить отказоустойчивость при приведении типов.

Я знаю, что наилучшей практикой при отказоустойчивости при приведении типов является указание схемы при чтении (для csv .mode ('FAILFAST')).

Но у меня есть устаревший код, где все столбцы читаются как строки и преобразуются в требуемую схему при записи. Мне интересно, по крайней мере теоретически:

Существует ли какой-либо подход к производительности для Spark с отказоустойчивостью во время приведения типов при записи?

Не рабочий пример

# called after all transformations
def cast_to_schema(df: DataFrame, schema: StructType) -> DataFrame:
    columns = [cast_to_type(F.col(f.name), f.dataType) for f in schema.fields]
    return df.select(*columns) 

# doesn't work - UDF and F.when don't play well together, UDF is called even when there aren't any casting issues
def cast_to_type(col: Column, d_type: DataType) -> DataFrame:
    casted = col.cast(d_type)
    casting_failed = col.isNotNull() & casted.isNull()

    def handler(value):
        raise Exception('Value' + value + 'can not be casted'))
    on_fail_udf = F.udf(handler, d_type.__class__.__name__, d_type)

    return F.when(casting_failed, on_fail_udf(col)).otherwise(casted)

PS

Я не хочу использовать UDF (потому что они медленные в PySpark). Я не хочу использовать Scala UDF, потому что это слишком много - лучше определять схему при чтении.

Также все подходы, которые включают в себя кэширование или двойное чтение, не подходят для меня.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...