Ошибка SparkStreaming в простом примере: java.lang.IllegalArgumentException - PullRequest
0 голосов
/ 19 января 2019

Я пытаюсь реализовать простой пример подсчета слов для sparkStreaming, прослушивая localhost:9999 и отправляя потоки текста, используя nc -lk 9999.

Я еще не реализовал логику подсчета слов, видя какЯ не могу выполнить этот простой код.Я думаю, что-то может быть не так с тем, как я пытаюсь получить потоки информации.

Ниже - мой код и сгенерированное исключение:

Код:

import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;


public class NetworkWordCount {
    public static void main(String[] args) throws InterruptedException{
        SparkConf conf = new SparkConf()
                .setAppName("NetworkWordCount")//;
                .setMaster("local");
        JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));

        JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);
        lines.print();

        jssc.start();
        jssc.awaitTermination();
 }
}

Журнал:

    Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
19/01/19 11:47:26 INFO SparkContext: Running Spark version 2.2.1
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.hadoop.security.authentication.util.KerberosUtil (file:/root/.m2/repository/org/apache/hadoop/hadoop-auth/2.6.5/hadoop-auth-2.6.5.jar) to method sun.security.krb5.Config.getInstance()
WARNING: Please consider reporting this to the maintainers of org.apache.hadoop.security.authentication.util.KerberosUtil
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
19/01/19 11:47:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
19/01/19 11:47:27 WARN Utils: Your hostname, imad-Alienware-15-R3 resolves to a loopback address: 127.0.1.1; using 192.168.43.120 instead (on interface wlp61s0)
19/01/19 11:47:27 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
19/01/19 11:47:27 INFO SparkContext: Submitted application: NetworkWordCount
19/01/19 11:47:27 INFO SecurityManager: Changing view acls to: root
19/01/19 11:47:27 INFO SecurityManager: Changing modify acls to: root
19/01/19 11:47:27 INFO SecurityManager: Changing view acls groups to: 
19/01/19 11:47:27 INFO SecurityManager: Changing modify acls groups to: 
19/01/19 11:47:27 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(root); groups with view permissions: Set(); users  with modify permissions: Set(root); groups with modify permissions: Set()
19/01/19 11:47:27 INFO Utils: Successfully started service 'sparkDriver' on port 46093.
19/01/19 11:47:27 INFO SparkEnv: Registering MapOutputTracker
19/01/19 11:47:27 INFO SparkEnv: Registering BlockManagerMaster
19/01/19 11:47:27 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
19/01/19 11:47:27 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
19/01/19 11:47:27 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-cc0cd249-2827-43ee-8497-14cba267d755
19/01/19 11:47:27 INFO MemoryStore: MemoryStore started with capacity 997.2 MB
19/01/19 11:47:27 INFO SparkEnv: Registering OutputCommitCoordinator
19/01/19 11:47:27 INFO Utils: Successfully started service 'SparkUI' on port 4040.
19/01/19 11:47:27 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://192.168.43.120:4040
19/01/19 11:47:27 INFO Executor: Starting executor ID driver on host localhost
19/01/19 11:47:27 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 37585.
19/01/19 11:47:27 INFO NettyBlockTransferService: Server created on 192.168.43.120:37585
19/01/19 11:47:27 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
19/01/19 11:47:27 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 192.168.43.120, 37585, None)
19/01/19 11:47:27 INFO BlockManagerMasterEndpoint: Registering block manager 192.168.43.120:37585 with 997.2 MB RAM, BlockManagerId(driver, 192.168.43.120, 37585, None)
19/01/19 11:47:27 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 192.168.43.120, 37585, None)
19/01/19 11:47:27 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 192.168.43.120, 37585, None)
19/01/19 11:47:27 WARN StreamingContext: spark.master should be set as local[n], n > 1 in local mode if you have receivers to get data, otherwise Spark jobs will not get resources to process the received data.
19/01/19 11:47:28 INFO ReceiverTracker: Starting 1 receivers
19/01/19 11:47:28 INFO ReceiverTracker: ReceiverTracker started
19/01/19 11:47:28 INFO SocketInputDStream: Slide time = 1000 ms
19/01/19 11:47:28 INFO SocketInputDStream: Storage level = Serialized 1x Replicated
19/01/19 11:47:28 INFO SocketInputDStream: Checkpoint interval = null
19/01/19 11:47:28 INFO SocketInputDStream: Remember interval = 1000 ms
19/01/19 11:47:28 INFO SocketInputDStream: Initialized and validated org.apache.spark.streaming.dstream.SocketInputDStream@5e72eed9
19/01/19 11:47:28 INFO ForEachDStream: Slide time = 1000 ms
19/01/19 11:47:28 INFO ForEachDStream: Storage level = Serialized 1x Replicated
19/01/19 11:47:28 INFO ForEachDStream: Checkpoint interval = null
19/01/19 11:47:28 INFO ForEachDStream: Remember interval = 1000 ms
19/01/19 11:47:28 INFO ForEachDStream: Initialized and validated org.apache.spark.streaming.dstream.ForEachDStream@7779b595
19/01/19 11:47:28 INFO RecurringTimer: Started timer for JobGenerator at time 1547894849000
19/01/19 11:47:28 INFO ReceiverTracker: Receiver 0 started
19/01/19 11:47:28 INFO JobGenerator: Started JobGenerator at 1547894849000 ms
19/01/19 11:47:28 INFO JobScheduler: Started JobScheduler
19/01/19 11:47:28 INFO DAGScheduler: Got job 0 (start at NetworkWordCount.java:28) with 1 output partitions
19/01/19 11:47:28 INFO DAGScheduler: Final stage: ResultStage 0 (start at NetworkWordCount.java:28)
19/01/19 11:47:28 INFO DAGScheduler: Parents of final stage: List()
19/01/19 11:47:28 INFO DAGScheduler: Missing parents: List()
19/01/19 11:47:28 INFO StreamingContext: StreamingContext started
19/01/19 11:47:28 INFO DAGScheduler: Submitting ResultStage 0 (Receiver 0 ParallelCollectionRDD[0] at makeRDD at ReceiverTracker.scala:620), which has no missing parents
19/01/19 11:47:28 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 46.5 KB, free 997.2 MB)
19/01/19 11:47:28 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 15.9 KB, free 997.1 MB)
19/01/19 11:47:28 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.43.120:37585 (size: 15.9 KB, free: 997.2 MB)
19/01/19 11:47:28 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1006
19/01/19 11:47:28 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (Receiver 0 ParallelCollectionRDD[0] at makeRDD at ReceiverTracker.scala:620) (first 15 tasks are for partitions Vector(0))
19/01/19 11:47:28 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
19/01/19 11:47:28 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, executor driver, partition 0, PROCESS_LOCAL, 5414 bytes)
19/01/19 11:47:28 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
19/01/19 11:47:28 INFO RecurringTimer: Started timer for BlockGenerator at time 1547894848600
19/01/19 11:47:28 INFO BlockGenerator: Started BlockGenerator
19/01/19 11:47:28 INFO BlockGenerator: Started block pushing thread
19/01/19 11:47:28 INFO ReceiverTracker: Registered receiver for stream 0 from 192.168.43.120:46093
19/01/19 11:47:28 INFO ReceiverSupervisorImpl: Starting receiver 0
19/01/19 11:47:28 INFO SocketReceiver: Connecting to localhost:9999
19/01/19 11:47:28 INFO SocketReceiver: Connected to localhost:9999
19/01/19 11:47:28 INFO ReceiverSupervisorImpl: Called receiver 0 onStart
19/01/19 11:47:28 INFO ReceiverSupervisorImpl: Waiting for receiver to be stopped
19/01/19 11:47:28 INFO MemoryStore: Block input-0-1547894848400 stored as bytes in memory (estimated size 55.0 B, free 997.1 MB)
19/01/19 11:47:28 INFO BlockManagerInfo: Added input-0-1547894848400 in memory on 192.168.43.120:37585 (size: 55.0 B, free: 997.2 MB)
19/01/19 11:47:28 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
19/01/19 11:47:28 WARN BlockManager: Block input-0-1547894848400 replicated to only 0 peer(s) instead of 1 peers
19/01/19 11:47:28 INFO BlockGenerator: Pushed block input-0-1547894848400
19/01/19 11:47:29 INFO JobScheduler: Added jobs for time 1547894849000 ms
19/01/19 11:47:29 INFO JobScheduler: Starting job streaming job 1547894849000 ms.0 from job set of time 1547894849000 ms
19/01/19 11:47:29 INFO JobScheduler: Finished job streaming job 1547894849000 ms.0 from job set of time 1547894849000 ms
19/01/19 11:47:29 ERROR JobScheduler: Error running job streaming job 1547894849000 ms.0
java.lang.IllegalArgumentException
    at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)
    at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)
    at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)
    at org.apache.spark.util.ClosureCleaner$.getClassReader(ClosureCleaner.scala:46)
    at org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:443)
    at org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:426)
    at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
    at scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:103)
    at scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:103)
    at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
    at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
    at scala.collection.mutable.HashMap$$anon$1.foreach(HashMap.scala:103)
    at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
    at org.apache.spark.util.FieldAccessFinder$$anon$3.visitMethodInsn(ClosureCleaner.scala:426)
    at org.apache.xbean.asm5.ClassReader.a(Unknown Source)
    at org.apache.xbean.asm5.ClassReader.b(Unknown Source)
    at org.apache.xbean.asm5.ClassReader.accept(Unknown Source)
    at org.apache.xbean.asm5.ClassReader.accept(Unknown Source)
    at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$14.apply(ClosureCleaner.scala:257)
    at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$14.apply(ClosureCleaner.scala:256)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:256)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:156)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2294)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2068)
    at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1354)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
    at org.apache.spark.rdd.RDD.take(RDD.scala:1327)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$print$2$$anonfun$foreachFunc$3$1.apply(DStream.scala:735)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$print$2$$anonfun$foreachFunc$3$1.apply(DStream.scala:734)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
    at scala.util.Try$.apply(Try.scala:192)
    at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1135)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:844)
Exception in thread "main" java.lang.IllegalArgumentException
    at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)
    at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)
    at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)
    at org.apache.spark.util.ClosureCleaner$.getClassReader(ClosureCleaner.scala:46)
    at org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:443)
    at org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:426)
    at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
    at scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:103)
    at scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:103)
    at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
    at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
    at scala.collection.mutable.HashMap$$anon$1.foreach(HashMap.scala:103)
    at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
    at org.apache.spark.util.FieldAccessFinder$$anon$3.visitMethodInsn(ClosureCleaner.scala:426)
    at org.apache.xbean.asm5.ClassReader.a(Unknown Source)
    at org.apache.xbean.asm5.ClassReader.b(Unknown Source)
    at org.apache.xbean.asm5.ClassReader.accept(Unknown Source)
    at org.apache.xbean.asm5.ClassReader.accept(Unknown Source)
    at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$14.apply(ClosureCleaner.scala:257)
    at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$14.apply(ClosureCleaner.scala:256)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:256)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:156)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2294)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2068)
    at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1354)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
    at org.apache.spark.rdd.RDD.take(RDD.scala:1327)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$print$2$$anonfun$foreachFunc$3$1.apply(DStream.scala:735)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$print$2$$anonfun$foreachFunc$3$1.apply(DStream.scala:734)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
    at scala.util.Try$.apply(Try.scala:192)
    at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1135)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:844)
19/01/19 11:47:29 INFO StreamingContext: Invoking stop(stopGracefully=false) from shutdown hook
19/01/19 11:47:29 INFO ReceiverTracker: Sent stop signal to all 1 receivers
19/01/19 11:47:29 INFO ReceiverSupervisorImpl: Received stop signal
19/01/19 11:47:29 INFO ReceiverSupervisorImpl: Stopping receiver with message: Stopped by driver: 
19/01/19 11:47:29 INFO SocketReceiver: Closed socket to localhost:9999
19/01/19 11:47:29 WARN SocketReceiver: Error receiving data
java.net.SocketException: Socket closed
    at java.base/java.net.SocketInputStream.socketRead0(Native Method)
    at java.base/java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
    at java.base/java.net.SocketInputStream.read(SocketInputStream.java:171)
    at java.base/java.net.SocketInputStream.read(SocketInputStream.java:141)
    at java.base/sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
    at java.base/sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
    at java.base/sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
    at java.base/java.io.InputStreamReader.read(InputStreamReader.java:185)
    at java.base/java.io.BufferedReader.fill(BufferedReader.java:161)
    at java.base/java.io.BufferedReader.readLine(BufferedReader.java:326)
    at java.base/java.io.BufferedReader.readLine(BufferedReader.java:392)
    at org.apache.spark.streaming.dstream.SocketReceiver$$anon$1.getNext(SocketInputDStream.scala:121)
    at org.apache.spark.streaming.dstream.SocketReceiver$$anon$1.getNext(SocketInputDStream.scala:119)
    at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
    at org.apache.spark.streaming.dstream.SocketReceiver.receive(SocketInputDStream.scala:91)
    at org.apache.spark.streaming.dstream.SocketReceiver$$anon$2.run(SocketInputDStream.scala:72)
19/01/19 11:47:29 INFO ReceiverSupervisorImpl: Called receiver onStop
19/01/19 11:47:29 INFO ReceiverSupervisorImpl: Deregistering receiver 0
19/01/19 11:47:29 WARN ReceiverSupervisorImpl: Restarting receiver with delay 2000 ms: Error receiving data
java.net.SocketException: Socket closed
    at java.base/java.net.SocketInputStream.socketRead0(Native Method)
    at java.base/java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
    at java.base/java.net.SocketInputStream.read(SocketInputStream.java:171)
    at java.base/java.net.SocketInputStream.read(SocketInputStream.java:141)
    at java.base/sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
    at java.base/sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
    at java.base/sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
    at java.base/java.io.InputStreamReader.read(InputStreamReader.java:185)
    at java.base/java.io.BufferedReader.fill(BufferedReader.java:161)
    at java.base/java.io.BufferedReader.readLine(BufferedReader.java:326)
    at java.base/java.io.BufferedReader.readLine(BufferedReader.java:392)
    at org.apache.spark.streaming.dstream.SocketReceiver$$anon$1.getNext(SocketInputDStream.scala:121)
    at org.apache.spark.streaming.dstream.SocketReceiver$$anon$1.getNext(SocketInputDStream.scala:119)
    at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
    at org.apache.spark.streaming.dstream.SocketReceiver.receive(SocketInputDStream.scala:91)
    at org.apache.spark.streaming.dstream.SocketReceiver$$anon$2.run(SocketInputDStream.scala:72)
19/01/19 11:47:29 ERROR ReceiverTracker: Deregistered receiver for stream 0: Stopped by driver
19/01/19 11:47:29 INFO ReceiverSupervisorImpl: Stopped receiver 0
19/01/19 11:47:29 INFO ReceiverSupervisorImpl: Stopping receiver with message: Restarting receiver with delay 2000ms: Error receiving data: java.net.SocketException: Socket closed
19/01/19 11:47:29 WARN ReceiverSupervisorImpl: Receiver has been stopped
19/01/19 11:47:29 INFO BlockGenerator: Stopping BlockGenerator
19/01/19 11:47:29 INFO RecurringTimer: Stopped timer for BlockGenerator after time 1547894849400
19/01/19 11:47:29 INFO BlockGenerator: Waiting for block pushing thread to terminate
19/01/19 11:47:29 INFO BlockGenerator: Pushing out the last 0 blocks
19/01/19 11:47:29 INFO BlockGenerator: Stopped block pushing thread
19/01/19 11:47:29 INFO BlockGenerator: Stopped BlockGenerator
Exception in thread "receiver-supervisor-future-0" java.lang.InterruptedException: sleep interrupted
    at java.base/java.lang.Thread.sleep(Native Method)
    at org.apache.spark.streaming.receiver.ReceiverSupervisor$$anonfun$restartReceiver$1.apply$mcV$sp(ReceiverSupervisor.scala:196)
    at org.apache.spark.streaming.receiver.ReceiverSupervisor$$anonfun$restartReceiver$1.apply(ReceiverSupervisor.scala:189)
    at org.apache.spark.streaming.receiver.ReceiverSupervisor$$anonfun$restartReceiver$1.apply(ReceiverSupervisor.scala:189)
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1135)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:844)
19/01/19 11:47:29 INFO ReceiverSupervisorImpl: Stopped receiver without error
19/01/19 11:47:29 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 794 bytes result sent to driver
19/01/19 11:47:29 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 1063 ms on localhost (executor driver) (1/1)
19/01/19 11:47:29 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
19/01/19 11:47:29 INFO DAGScheduler: ResultStage 0 (start at NetworkWordCount.java:28) finished in 1.085 s
19/01/19 11:47:29 INFO ReceiverTracker: All of the receivers have deregistered successfully
19/01/19 11:47:29 INFO ReceiverTracker: ReceiverTracker stopped
19/01/19 11:47:29 INFO JobGenerator: Stopping JobGenerator immediately
19/01/19 11:47:29 INFO RecurringTimer: Stopped timer for JobGenerator after time 1547894849000
19/01/19 11:47:29 INFO JobGenerator: Stopped JobGenerator
19/01/19 11:47:29 INFO JobScheduler: Stopped JobScheduler
19/01/19 11:47:29 INFO StreamingContext: StreamingContext stopped successfully
19/01/19 11:47:29 INFO SparkContext: Invoking stop() from shutdown hook
19/01/19 11:47:29 INFO SparkUI: Stopped Spark web UI at http://192.168.43.120:4040
19/01/19 11:47:29 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
19/01/19 11:47:29 INFO MemoryStore: MemoryStore cleared
19/01/19 11:47:29 INFO BlockManager: BlockManager stopped
19/01/19 11:47:29 INFO BlockManagerMaster: BlockManagerMaster stopped
19/01/19 11:47:29 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
19/01/19 11:47:29 INFO SparkContext: Successfully stopped SparkContext
19/01/19 11:47:29 INFO ShutdownHookManager: Shutdown hook called
19/01/19 11:47:29 INFO ShutdownHookManager: Deleting directory /tmp/spark-74f53f9e-d91c-4870-855c-f4272031804b

Process finished with exit code 1

РЕДАКТИРОВАТЬ и ИСПРАВИТЬ: Проблема заключается в вызове print() с JavaDStream или JavaPairDStream.

Я выполнил скрипт для подсчета слов следующим образом:

import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;

import java.util.Arrays;


public class NetworkWordCount {
    public static void main(String[] args) throws InterruptedException {
        SparkConf conf = new SparkConf()
                .setAppName("NetworkWordCount")
                .setMaster("local");
        JavaStreamingContext jssc =
                new JavaStreamingContext(conf, Durations.seconds(1));

        JavaReceiverInputDStream<String> lines =
                jssc.socketTextStream("localhost", 9999);

        JavaDStream<String> words =
                lines.flatMap(x -> Arrays.asList(x.split(" ")).iterator());
        JavaPairDStream<String, Integer> pairs =
                words.mapToPair(s -> new Tuple2<>(s, 1));
        JavaPairDStream<String, Integer> wordCounts =
                pairs.reduceByKey((i1, i2) -> i1 + i2);

        wordCounts.foreachRDD(rdd -> rdd.foreach(data -> System.out.println(data._1+"-----"+data._2)));
        jssc.start();
        jssc.awaitTermination();
    }
}

        wordCounts.foreachRDD(rdd -> rdd.foreach(data -> System.out.println(data._1+"-----"+data._2)));

После запуска nc -lk 9999 и ввода следующих строк:

Helloмой человек

Как дела

Я не уверен, что это работает

, но я надеюсь, что это не испортит

Я получаю следующие результаты:

Первая запись:

Hello ----- 1

my ----- 1

man -----1

Вторая запись:

Вы ----- 1

Как ----- 1

делаете----- 1

Третья запись:

Im ----- 1

это ----- 1

не ----- 1

является ----- 1

уверен ----- 1

работает ----- 1

Четвертая запись:

вверх ----- 1

это ----- 1

надежда -----1

беспорядок ----- 1

i ----- 1

нет ----- 1

but----- 1

Я не могу объяснить, почему это решение работает, но не работает при использовании JavaDStream.print().Объяснение было бы очень полезно.

Большое спасибо

...