Я в Windows 10 пытаюсь несколько раз прочитать текстовые строки, отделенные '\n'
от источника TCPsocket (пока для целей тестирования) с использованием Spark Streaming (Spark 2.4.4). Слова должны быть подсчитаны, а текущее количество слов регулярно отображается на консоли. Это стандартный тест потоковой передачи Spark, который можно найти в нескольких книгах и публикациях в Интернете, но, похоже, он не работает с источником сокета:
Текстовые строки отправляются из Java-программы, например:
serverOutSock = new ServerSocket(9999);
// Establish connection; wait for Spark to connect
sockOut = serverOutSock.accept();
// Set UTF-8 as format
sockOutput = new OutputStreamWriter(sockOut.getOutputStream(),"UTF-8");
// Multiple Java Strings are now written (thousands of them) like
sockOutput.write(string+'\n');
На принимающей стороне Spark код Scala выглядит следующим образом:
val spark = SparkSession.builder.master("local[*]").getOrCreate()
import spark.implicits._
val socketDF = spark.readStream.format("socket").option("host","localhost").option("port",9999).load
val words = socketDF.as[String].flatMap(_.split(" ")).coalesce(1)
val wordCounts = words.groupBy("value").count()
val query = wordCounts.writeStream
.trigger(Trigger.Continuous("1 second"))
.outputMode("complete")
.format("console")
.start
.awaitTermination
Итак, я хотел бы получать раз в секунду запись на консоли текущего количества слов.
Но я получаю ошибку:
java.lang.ClassCastException: java.lang.String не может быть приведен к org.apache.spark.unsafe.types.UTF8String
и Spark, похоже, ничего не обрабатывает из источника (из-за исключения приведения исходного ввода?). По крайней мере, ничего не написано на консоли. Что может быть причиной этого?
Ниже приведена полная трассировка стека:
Exception in thread "null-4" java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.spark.unsafe.types.UTF8String
at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getUTF8String(rows.scala:46)
at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getUTF8String(rows.scala:195)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
at org.apache.spark.sql.execution.streaming.continuous.shuffle.RPCContinuousShuffleWriter.write(RPCContinuousShuffleWriter.scala:51)
at org.apache.spark.sql.execution.streaming.continuous.ContinuousCoalesceRDD$$anonfun$4$$anon$1.run(ContinuousCoalesceRDD.scala:112)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Я попытался удалить coalesque(1)
и заменил триггер Continuous
на триггер ProcessingTime
. Это делает ошибку невозможной, но вывод на консоль становится:
Пакет: 0
+ ----- + ----- + | value | count |+ ----- + ----- + + ----- + ----- +
То есть не выводится, хотя многие слова действительно вставляются в сокет. Кроме того, этот вывод отображается только ondce и намного позже, чем через 1 секунду.