Мне уже удалось использовать Spark Streaming, как показано ниже.
import org.apache.spark.streaming._
import org.apache.spark._
import org.apache.spark.streaming.StreamingContext._
val conf = new SparkConf().setMaster(“local[2]”).setAppName(“NetworkWordCount”)
val ssc = new StreamingContext(sc, Seconds(30))
val lines = ssc.socketTextStream("localhost", 8888)
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => 1))
val wordCounts = pairs.reduceByKey(_ + _)
wordCounts.print()
scc.start()
nc -lvp 8888
И я хотел попытаться подсчитать количество слов для данных, которые я получил через сокет, как показано ниже.
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
import spark.implicits._
val spark = SparkSession.builder.appName("StructuredNetworkWordCount").getOrCreate()
spark.conf.set("spark.sql.shuffle.partitions", 5)
val lines = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load()
val words = lines.as[String].flatMap(_.split(" "))
val wordCounts = words.groupBy("value").count()
val query = wordCounts.writeStream.outputMode("complete").format("console").start()
Но у меня такая ошибка.
ERROR MicroBatchExecution: Query [id = 36c54725-eeb5-415e-829a-e73c079d47d4, runId = 048f9d74-6456-4b01-b058-a4bd8e105b91] terminated with error
java.net.ConnectException: Connection refused (Connection refused)
at java.net.PlainSocketImpl.socketConnect(Native Method)
at
java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at
java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:607)
at java.net.Socket.connect(Socket.java:556)
at java.net.Socket.<init>(Socket.java:452)
at java.net.Socket.<init>(Socket.java:229)
В чем проблема и различия между двумя способами при потоковой передаче? Спасибо!