Я хотел бы применить OneHotEncoder к нескольким столбцам в моем потоковом фрейме данных, но я получил следующую ошибку. Какие-либо предложения?
Большое спасибо!
Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources
must be executed with writeStream.start();;
КОД: // Чтение csv
val Stream = spark.read
.format("csv")
.option("header", "true")
.option("delimiter", ";")
.option("header", "true")
.schema(DFschema)
.load("C:/[...]"/
// Kafka
val properties = new Properties()
//val topic = "mongotest"
properties.put("bootstrap.servers", "localhost:9092")
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
Stream.selectExpr("CAST(Col AS STRING) AS KEY",
"to_json(struct(*)) AS value")
.writeStream
.format("kafka")
.option("topic", "predict")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("checkpointLocation", "C:[...]")
.start()
Подписаться на тему
val lines = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "predict")
.load()
val df = Stream
.selectExpr("CAST(value AS STRING)")
val jsons = df.select(from_json($"value", DFschema) as "data").select("data.*")
ETL [...]
Применить функцию Bucketizer () к полю
val Msplits = Array(Double.NegativeInfinity,7, 14, 21, Double.PositiveInfinity)
val bucketizerM = new Bucketizer()
.setInputCol("MEASURE")
.setOutputCol("MEASURE_c")
.setSplits(Msplits)
val bucketedData1 = bucketizerD.transform(out)
val bucketedData2 = bucketizerM.transform(bucketedData1) # Works
Ошибка при использовании OneHotEncoder ()
val indexer = new StringIndexer()
.setInputCol("CODE")
.setOutputCol("CODE_index")
val encoder = new OneHotEncoder()
.setInputCol("CODE")
.setOutputCol("CODE_encoded")
val vectorAssembler = new VectorAssembler()
.setInputCols(Array("A","B", "CODE_encoded"))
.setOutputCol("features")
val transformationPipeline = new Pipeline()
.setStages(Array(indexer, encoder, vectorAssembler))
val fittedPipeline = transformationPipeline.fit(bucketedData2) # Does't work