Я хочу применить функцию кодирования меток sklearn.preprocessing для потоковой передачи данных с использованием структурированной потоковой передачи Kafka и Spark. Пока идея такова:
Когда я получаю пакет данных из источника Kafka каждый раз, я хочу реализовать функцию для этого пакета следующим образом:
def use_label_encoder(label_encoder, y):
return label_encoder.transform(y) + 1
to_transform_class_val = udf(use_label_encoder, IntegerType())
Вот схема:
schema = StructType([
StructField("sepal_length_in_cm", StringType()), \
StructField("sepal_width_in_cm", StringType()), \
StructField("petal_length_in_cm", StringType()), \
StructField("petal_width_in_cm", StringType()), \
StructField("class", StringType())
])
df = df.selectExpr("CAST(value AS STRING)")
df1 = df.select(from_json(df.value, schema).alias("json"))
когда я пытаюсь определить label_encoder:
label_encoder = enc.fit(df1.select(to_upper("json.class")))
выдает ошибку "неправильная форма ввода"
Эквивалентный код, который я использовал для не потоковых данных:
y = df['class'].values
enc = LabelEncoder()
label_encoder = enc.fit(y)
y = label_encoder.transform(y) + 1
Может ли кто-нибудь помочь мне с тем, как применить метод sklearn для потоковой передачи данных?