Как добавить udf как этап в конвейере PySpark ML? - PullRequest
0 голосов
/ 02 марта 2020

sample df:

+-----------+
|pred_vector|
+-----------+
| [0.5, 0.6]|                  
| [0.7, 0.8]|                   
| [1.1, 1.5]|                                 
+-----------+

Я использую следующую функцию, чтобы извлечь значение по i-му индексу из плотного вектора и преобразовать в float:

from pyspark.sql.types import DoubleType
from pyspark.sql.functions import lit, udf

def ith_(v, i):
    try:
        return float(v[i])
    except ValueError:
        return None

ith = udf(ith_, DoubleType())

df.select(ith("features", lit(1))).show()

## +-----------------+
## |ith_(features, 1)|
## +-----------------+
## |              0.5|
## |              0.7|
## |              1.1|
## +-----------------+

Используя следующий код, я хочу создать собственный трансформатор, который я могу добавить на этапах конвейера. Предполагается извлечь первое значение из вектора в столбце pred_vector в df, преобразовать его в float и сохранить в новом столбце в df

# CUSTOM TRANSFORMER ----------------------------------------------------------------
class TypeConverter(Transformer):
    """
    A custom Transformer which converts 'pred_vector' column dense vector type to float.
    """

    def __init__(self):
        super(TypeConverter, self).__init__()


    def _transform(self, df: DataFrame) -> DataFrame:
        df = df.withColumn("new_column", ith("pred_vector"))
        return df

df:
+-----------+-----------+
|pred_vector|new_column |
+-----------+-----------+
| [0.5, 0.6]| 0.5       |          
| [0.7, 0.8]| 0.7       |            
| [1.1, 1.5]| 1.1       |                         
+-----------+-----------+

Как я могу вписать udf в Кастом трансформер?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...