Как я могу решить ОШИБКУ MicroBatchExecution с потоковой передачей сокетов в искре с scala? - PullRequest
0 голосов
/ 07 августа 2020

Мне уже удалось использовать Spark Streaming, как показано ниже.

import org.apache.spark.streaming._
import org.apache.spark._
import org.apache.spark.streaming.StreamingContext._

val conf = new SparkConf().setMaster(“local[2]”).setAppName(“NetworkWordCount”)    
val ssc = new StreamingContext(sc, Seconds(30))
val lines = ssc.socketTextStream("localhost", 8888)
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => 1))
val wordCounts = pairs.reduceByKey(_ + _)
wordCounts.print()
scc.start()

nc -lvp 8888

И я хотел попытаться подсчитать количество слов для данных, которые я получил через сокет, как показано ниже.

import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
import spark.implicits._

 val spark = SparkSession.builder.appName("StructuredNetworkWordCount").getOrCreate()
 spark.conf.set("spark.sql.shuffle.partitions", 5)

 val lines = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load()

 val words = lines.as[String].flatMap(_.split(" "))
 val wordCounts = words.groupBy("value").count()
 val query = wordCounts.writeStream.outputMode("complete").format("console").start()

Но у меня такая ошибка.

  ERROR MicroBatchExecution: Query [id = 36c54725-eeb5-415e-829a-e73c079d47d4, runId = 048f9d74-6456-4b01-b058-a4bd8e105b91] terminated with error
 java.net.ConnectException: Connection refused (Connection refused)
    at java.net.PlainSocketImpl.socketConnect(Native Method)
    at 
 java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
    at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
    at 
 java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
    at java.net.Socket.connect(Socket.java:607)
    at java.net.Socket.connect(Socket.java:556)
    at java.net.Socket.<init>(Socket.java:452)
    at java.net.Socket.<init>(Socket.java:229)

В чем проблема и различия между двумя способами при потоковой передаче? Спасибо!

...