Spark Streaming, чтение из Socket: java.lang.ClassCastException: java.lang.String не может быть приведен к org.apache.spark.unsafe.types.UTF8String - PullRequest
1 голос
/ 08 ноября 2019

Я в Windows 10 пытаюсь несколько раз прочитать текстовые строки, отделенные '\n' от источника TCPsocket (пока для целей тестирования) с использованием Spark Streaming (Spark 2.4.4). Слова должны быть подсчитаны, а текущее количество слов регулярно отображается на консоли. Это стандартный тест потоковой передачи Spark, который можно найти в нескольких книгах и публикациях в Интернете, но, похоже, он не работает с источником сокета:

Текстовые строки отправляются из Java-программы, например:

serverOutSock = new ServerSocket(9999);
// Establish connection; wait for Spark to connect
sockOut = serverOutSock.accept();
// Set UTF-8 as format
sockOutput = new OutputStreamWriter(sockOut.getOutputStream(),"UTF-8");
// Multiple Java Strings are now written (thousands of them) like
sockOutput.write(string+'\n');

На принимающей стороне Spark код Scala выглядит следующим образом:

val spark = SparkSession.builder.master("local[*]").getOrCreate()
import spark.implicits._
val socketDF = spark.readStream.format("socket").option("host","localhost").option("port",9999).load
val words = socketDF.as[String].flatMap(_.split(" ")).coalesce(1)
val wordCounts = words.groupBy("value").count()
val query = wordCounts.writeStream
      .trigger(Trigger.Continuous("1 second"))
      .outputMode("complete")
      .format("console")
      .start
      .awaitTermination

Итак, я хотел бы получать раз в секунду запись на консоли текущего количества слов.

Но я получаю ошибку:

java.lang.ClassCastException: java.lang.String не может быть приведен к org.apache.spark.unsafe.types.UTF8String

и Spark, похоже, ничего не обрабатывает из источника (из-за исключения приведения исходного ввода?). По крайней мере, ничего не написано на консоли. Что может быть причиной этого?

Ниже приведена полная трассировка стека:

Exception in thread "null-4" java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.spark.unsafe.types.UTF8String
        at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getUTF8String(rows.scala:46)
        at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getUTF8String(rows.scala:195)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
        at org.apache.spark.sql.execution.streaming.continuous.shuffle.RPCContinuousShuffleWriter.write(RPCContinuousShuffleWriter.scala:51)
        at org.apache.spark.sql.execution.streaming.continuous.ContinuousCoalesceRDD$$anonfun$4$$anon$1.run(ContinuousCoalesceRDD.scala:112)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

Я попытался удалить coalesque(1) и заменил триггер Continuous на триггер ProcessingTime. Это делает ошибку невозможной, но вывод на консоль становится:


Пакет: 0

+ ----- + ----- + | value | count |+ ----- + ----- + + ----- + ----- +

То есть не выводится, хотя многие слова действительно вставляются в сокет. Кроме того, этот вывод отображается только ondce и намного позже, чем через 1 секунду.

...