Я пытаюсь отправить потоковые данные в Kafka, используя потоковый метод Spark. Но задание spark не инициирует задачу и не выполняет ее, выдавая сообщение об ошибке как «MicroBatchExecution: Запрос завершен с ошибкой java.lang.IllegalStateException:Невозможно вызвать методы для остановленного SparkContext ".
Я отправляю заявку следующим образом:
/usr/hdp/current/spark2-client/bin/spark-submit \
--class com.test.testone.kafkaStream.KafkaProducer \
--master yarn \
--deploy-mode client \
--executor-memory 2G \
--num-executors 4 \
--jars /home/yeswanth/jars/spark-sql-kafka-0-10_2.11-2.3.0.jar,/home/yeswanth/jars/kafka-clients-1.0.0.jar \
/home/yeswanth/jars/kafkaStreams-1.0-SNAPSHOT.jar
Пожалуйста, найдите код ниже.
val spark = SparkSession
.builder
.appName("Spark-Kafka-Integration")
.master("local[1]")
.getOrCreate()
val mySchema = StructType(Array(
StructField("id", IntegerType),
StructField("name", StringType),
StructField("year", IntegerType),
StructField("rating", DoubleType),
StructField("duration", IntegerType)
))
val streamingDataFrame = spark.readStream.schema(mySchema).csv("Path")
streamingDataFrame.selectExpr("CAST(id AS STRING) AS key", "to_json(struct(*)) AS value")
.writeStream
.format("kafka")
.option("topic", "topicname")
.option("kafka.bootstrap.servers", "alpha:6667")
.option("checkpointLocation","path")
.outputMode("append")
.start()
Трассировка стекаприведено ниже:
19/10/14 02:11:19 INFO FileStreamSource: maxFilesPerBatch = None, maxFileAgeMs = 604800000
19/10/14 02:11:19 ERROR MicroBatchExecution: Query [id = c6d99dc8-6dfd-4088-b1fa-7558c7ec8305, runId = 418bd5a2-c9fe-4d6c-8027-997a6a11e298] terminated with error
java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext.
This stopped SparkContext was created at:
org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:921)
com.test.testone.kafkaStream.KafkaProducer$.main(KafkaProducer.scala:18)
com.test.testone.kafkaStream.KafkaProducer.main(KafkaProducer.scala)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:498)
org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:906)
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:197)
org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:227)
org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:136)
org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
The currently active SparkContext was created at:
org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:921)
com.test.testone.kafkaStream.KafkaProducer$.main(KafkaProducer.scala:18)
com.test.testone.kafkaStream.KafkaProducer.main(KafkaProducer.scala)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:498)
org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:906)
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:197)
org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:227)
org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:136)
org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
at org.apache.spark.SparkContext.assertNotStopped(SparkContext.scala:99)
at org.apache.spark.sql.SparkSession.<init>(SparkSession.scala:88)
at org.apache.spark.sql.SparkSession.cloneSession(SparkSession.scala:252)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:268)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
19/10/14 02:11:19 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
19/10/14 02:11:19 INFO MemoryStore: MemoryStore cleared
19/10/14 02:11:19 INFO BlockManager: BlockManager stopped
19/10/14 02:11:19 INFO BlockManagerMaster: BlockManagerMaster stopped
19/10/14 02:11:19 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
19/10/14 02:11:19 INFO SparkContext: Successfully stopped SparkContext
19/10/14 02:11:19 INFO ShutdownHookManager: Shutdown hook called
19/10/14 02:11:19 INFO ShutdownHookManager: Deleting directory /tmp/spark-8a0488a0-77e0-43ee-a5d3-d50847a9531d
19/10/14 02:11:19 INFO ShutdownHookManager: Deleting directory /tmp/spark-2e344c85-4324-4563-93fa-f5a9392873bb```