Попробуйте использовать библиотеку sklearn в Spark Structured Streaming - PullRequest
0 голосов
/ 11 мая 2018

Я хочу применить функцию кодирования меток sklearn.preprocessing для потоковой передачи данных с использованием структурированной потоковой передачи Kafka и Spark. Пока идея такова:

Когда я получаю пакет данных из источника Kafka каждый раз, я хочу реализовать функцию для этого пакета следующим образом:

def use_label_encoder(label_encoder, y):
   return label_encoder.transform(y) + 1

to_transform_class_val = udf(use_label_encoder, IntegerType())

Вот схема:

schema = StructType([
StructField("sepal_length_in_cm", StringType()), \
StructField("sepal_width_in_cm", StringType()), \
StructField("petal_length_in_cm", StringType()), \
StructField("petal_width_in_cm", StringType()), \
StructField("class", StringType())
])

df = df.selectExpr("CAST(value AS STRING)")
df1 = df.select(from_json(df.value, schema).alias("json"))

когда я пытаюсь определить label_encoder:

label_encoder = enc.fit(df1.select(to_upper("json.class")))

выдает ошибку "неправильная форма ввода"

Эквивалентный код, который я использовал для не потоковых данных:

y = df['class'].values
enc = LabelEncoder()
label_encoder = enc.fit(y)
y = label_encoder.transform(y) + 1

Может ли кто-нибудь помочь мне с тем, как применить метод sklearn для потоковой передачи данных?

1 Ответ

0 голосов
/ 11 мая 2018

Можете ли вы добавить 1 позже? Ваш искровой код станет

def use_label_encoder(label_encoder, y):
    return label_encoder.transform(y)

to_transform_class_val = udf(use_label_encoder, IntegerType())

df = df.withColumn('new_col', to_transform_class_val(label_encoder, 'old_column'))
df = df.withColumn('label_enc', col('new_col') + lit(1))
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...