Запись отсортированного фрейма данных в тему kafka (строки отсортированного порядка) в структурированной потоковой передаче с использованием scala - PullRequest
0 голосов
/ 20 января 2019

при отображении результатов сортировки в консоли результаты отображаются в порядке сортировки, как и ожидалось, но когда я помещаю эти результаты в раздел kafka, порядок сортировки отсутствует

def main(args: Array[String]) = {
//Spark config and kafka config 
// load method 

val Raw_df = readStream(sparkSession, inputtopic)
//converting read kafka mesages into json format 
    val df_messages = Raw_df.selectExpr("CAST(value AS STRING)")
      .withColumn("data", from_json($"value", my_schema))
      .select("data.*")

  val win = window($"date_column","5 minutes")

 val modified_df = df_messages.withWatermark("date_column", "3 minutes")
    .groupBy(win,$"All_colums",  $"date_column")
    .count()  
    .orderBy(asc("date_column"),asc("column_5"))

    val finalcol = modified_df.drop("count").drop("window")

   //mapping all columsn and converting them to json mesages
    val finalcolonames = my_schema.fields.map(z => z.name)
    val dataset_Json = finalcol.withColumn("value", to_json(struct(finalcolonames.map(y => col(y)): _*)))
          .select($"value")
   //val query = writeToKafkaStremoutput(dataset_Json, outputtopic,checkpointlocation)
    val query = writeToConsole(order)  

    (query)
  }
  //below method write data to kafka topic 
  def writeToKafkaStremoutput(dataFrame: DataFrame,  Config: Config, topic: String,checkpointlocation:String) = {
    dataFrame
      .selectExpr( "CAST(value AS STRING)")
      .writeStream
      .format("kafka")
      .trigger(Trigger.ProcessingTime("1 second"))
      .option("topic", topic)
      .option("kafka.bootstrap.servers", "kafka.bootstrap_servers")
      .option("checkpointLocation",checkpointPath)
      .outputMode(OutputMode.Complete()) 
      .start()
  }
//console op for testing
// below method write data toconsole 

  def writeToConsole(dataFrame: DataFrame) = {
    import org.apache.spark.sql.streaming.Trigger
    val query = dataFrame
      .writeStream
      .format("console")
      .option("numRows",300) 
      //.trigger(Trigger.ProcessingTime("20 second"))
      .outputMode(OutputMode.Complete())
      .option("truncate", false) 
      .start()
    query
  }
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...