OneHotEncoder с потоковой передачей данных - PullRequest
1 голос
/ 19 октября 2019

Я хотел бы применить OneHotEncoder к нескольким столбцам в моем потоковом фрейме данных, но я получил следующую ошибку. Какие-либо предложения?

Большое спасибо!

 Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources 
 must be executed with writeStream.start();;

КОД: // Чтение csv

  val Stream = spark.read
  .format("csv")
  .option("header", "true")
  .option("delimiter", ";")
  .option("header", "true")
  .schema(DFschema)
  .load("C:/[...]"/

// Kafka

   val properties = new Properties()
   //val topic = "mongotest"
   properties.put("bootstrap.servers", "localhost:9092")
   properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
   properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")

   Stream.selectExpr("CAST(Col AS STRING) AS KEY", 
   "to_json(struct(*)) AS value")
   .writeStream
   .format("kafka")
   .option("topic", "predict")
   .option("kafka.bootstrap.servers", "localhost:9092")
   .option("checkpointLocation", "C:[...]")
   .start()

Подписаться на тему

  val lines = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "predict")
  .load()

  val df = Stream
  .selectExpr("CAST(value AS STRING)")
  val jsons = df.select(from_json($"value", DFschema) as "data").select("data.*")

ETL [...]

Применить функцию Bucketizer () к полю

 val Msplits = Array(Double.NegativeInfinity,7, 14, 21, Double.PositiveInfinity)
  val bucketizerM = new Bucketizer()
  .setInputCol("MEASURE")
  .setOutputCol("MEASURE_c")
  .setSplits(Msplits)

  val bucketedData1 = bucketizerD.transform(out)
  val bucketedData2 = bucketizerM.transform(bucketedData1) # Works

Ошибка при использовании OneHotEncoder ()

  val indexer = new StringIndexer()
  .setInputCol("CODE")
  .setOutputCol("CODE_index")

  val encoder = new OneHotEncoder()
  .setInputCol("CODE")
  .setOutputCol("CODE_encoded")

  val vectorAssembler = new VectorAssembler()
 .setInputCols(Array("A","B", "CODE_encoded"))
 .setOutputCol("features")

  val transformationPipeline = new Pipeline()
 .setStages(Array(indexer, encoder, vectorAssembler))

  val fittedPipeline = transformationPipeline.fit(bucketedData2) # Does't work
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...