Я использую команду "n c -lp 9999" для ввода данных сокета для структурированного потокового ввода, я использую этот файл scala из inte rnet.
import java.sql.Timestamp
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
object wordCountWindowed {
def main(args: Array[String]) {
if (args.length < 3) {
System.err.println("Usage: StructuredNetworkWordCountWindowed <hostname> <port>" +
" <window duration in seconds> [<slide duration in seconds>]")
System.exit(1)
}
val host = args(0)
val port = args(1).toInt
val windowSize = args(2).toInt
val slideSize = if (args.length == 3) windowSize else args(3).toInt
if (slideSize > windowSize) {
System.err.println("<slide duration> must be less than or equal to <window duration>")
}
val windowDuration = s"$windowSize seconds"
val slideDuration = s"$slideSize seconds"
val spark = SparkSession
.builder
.appName("StructuredNetworkWordCountWindowed")
.getOrCreate()
import spark.implicits._
// Create DataFrame representing the stream of input lines from connection to host:port
val lines = spark.readStream
.format("socket")
.option("host", host)
.option("port", port)
.option("includeTimestamp", true)
.load()
// Split the lines into words, retaining timestamps
val words = lines.as[(String, Timestamp)].flatMap(line =>
line._1.split(" ").map(word => (word, line._2))
).toDF("word", "timestamp")
// Group the data by window and word and compute the count of each group
val windowedCounts = words.groupBy(
window($"timestamp", windowDuration, slideDuration), $"word"
).count().orderBy("window")
// Start running the query that prints the windowed word counts to the console
val query = windowedCounts.writeStream
.outputMode("complete")
.format("console")
.option("truncate", "false")
.start()
query.awaitTermination()
}
}
Я использую это в качестве ввода:
nc -lp 9999
abc
1980-01-19 07:22:50 xyz
Но программа scala будет запускать вывод: mvn exe c: java -Dexe c .mainClass = "wordCountWindowed" -Dexe c .args = "localhost 9999 10 10"
Batch: 1
-------------------------------------------
+------------------------------------------+----------+-----+
|window |word |count|
+------------------------------------------+----------+-----+
|[1970-01-19 07:22:50, 1970-01-19 07:23:00]|1980-01-19|1 |
|[1970-01-19 07:22:50, 1970-01-19 07:23:00]|07:22:50 |1 |
|[1970-01-19 07:22:50, 1970-01-19 07:23:00]|abc |1 |
|[1970-01-19 07:22:50, 1970-01-19 07:23:00]|xyz |1 |
+------------------------------------------+----------+-----+
Кажется, мой ввод метки времени недопустим как обычная строка. Итак, мой вопрос: как ввести в команду "n c", чтобы эта программа могла анализировать ввод с консоли, чтобы быть меткой времени, которая могла бы использоваться как "время события" потоковой структурированной искры?
Спасибо много.