Spark структурированный поток: как ввести метку времени из командной строки? - PullRequest
0 голосов
/ 15 февраля 2020

Я использую команду "n c -lp 9999" для ввода данных сокета для структурированного потокового ввода, я использую этот файл scala из inte rnet.

import java.sql.Timestamp
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
object wordCountWindowed {
  def main(args: Array[String]) {
    if (args.length < 3) {
      System.err.println("Usage: StructuredNetworkWordCountWindowed <hostname> <port>" +
        " <window duration in seconds> [<slide duration in seconds>]")
      System.exit(1)
    }

    val host = args(0)
    val port = args(1).toInt
    val windowSize = args(2).toInt
    val slideSize = if (args.length == 3) windowSize else args(3).toInt
    if (slideSize > windowSize) {
      System.err.println("<slide duration> must be less than or equal to <window duration>")
    }
    val windowDuration = s"$windowSize seconds"
    val slideDuration = s"$slideSize seconds"

    val spark = SparkSession
      .builder
      .appName("StructuredNetworkWordCountWindowed")
      .getOrCreate()

    import spark.implicits._

    // Create DataFrame representing the stream of input lines from connection to host:port
    val lines = spark.readStream
      .format("socket")
      .option("host", host)
      .option("port", port)
      .option("includeTimestamp", true)
      .load()

    // Split the lines into words, retaining timestamps
    val words = lines.as[(String, Timestamp)].flatMap(line =>
      line._1.split(" ").map(word => (word, line._2))
    ).toDF("word", "timestamp")

    // Group the data by window and word and compute the count of each group
    val windowedCounts = words.groupBy(
      window($"timestamp", windowDuration, slideDuration), $"word"
    ).count().orderBy("window")

    // Start running the query that prints the windowed word counts to the console
    val query = windowedCounts.writeStream
      .outputMode("complete")
      .format("console")
      .option("truncate", "false")
      .start()

    query.awaitTermination()
  }
}

Я использую это в качестве ввода:

nc -lp 9999
abc
1980-01-19 07:22:50 xyz

Но программа scala будет запускать вывод: mvn exe c: java -Dexe c .mainClass = "wordCountWindowed" -Dexe c .args = "localhost 9999 10 10"

Batch: 1
-------------------------------------------
+------------------------------------------+----------+-----+
|window                                    |word      |count|
+------------------------------------------+----------+-----+
|[1970-01-19 07:22:50, 1970-01-19 07:23:00]|1980-01-19|1    |
|[1970-01-19 07:22:50, 1970-01-19 07:23:00]|07:22:50  |1    |
|[1970-01-19 07:22:50, 1970-01-19 07:23:00]|abc       |1    |
|[1970-01-19 07:22:50, 1970-01-19 07:23:00]|xyz       |1    |
+------------------------------------------+----------+-----+

Кажется, мой ввод метки времени недопустим как обычная строка. Итак, мой вопрос: как ввести в команду "n c", чтобы эта программа могла анализировать ввод с консоли, чтобы быть меткой времени, которая могла бы использоваться как "время события" потоковой структурированной искры?

Спасибо много.

...