Как преобразовать искровой поток в одну строку - PullRequest
0 голосов
/ 21 апреля 2020

Я беру данные из Spark Stream и передаю их в файл python для прогноза. Я хочу, чтобы данные были в одной строке, чтобы я мог вставить их в файл python, но я не получаю поток в одну строку. Как конвертировать искровой поток в String

val stm = new StreamingContext(sc, Seconds(10))
  val socketStream = stm.socketTextStream("localhost", 9999)
  val script = "python python file path"
  socketStream.foreachRDD(
    line => if (!line.partitions.isEmpty) {
      val text = line.partitions.mkString(" ")
      val operation = line.pipe(script)
      val output = operation.collect
      output.print()
    }
  )
  socketStream.print()
  stm.start()
  stm.awaitTermination()

1 Ответ

0 голосов
/ 21 апреля 2020
def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().master("local[*]").getOrCreate()
    val df : DataFrame = spark.readStream
      .format("socket")
      .option("host","localhost")
      .option("port","9090")
      .load()

    val stringEncoder: Encoder[String] = Encoders.STRING
    df.map( r => r.mkString)(stringEncoder)
      .groupByKey( s => "")(stringEncoder)
      .mapGroups((k,v) => {
        v.mkString("|")
      })(stringEncoder)
      .foreach(println(_))
  }
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...