ОШИБКА spark MicroBatchExecution: Запрос завершен с ошибкой java.lang.IllegalStateException: Невозможно вызвать методы при остановленном SparkContext - PullRequest
0 голосов
/ 04 октября 2019

Я пытаюсь отправить потоковые данные в Kafka, используя потоковый метод Spark. Но задание spark не инициирует задачу и не выполняет ее, выдавая сообщение об ошибке как «MicroBatchExecution: Запрос завершен с ошибкой java.lang.IllegalStateException:Невозможно вызвать методы для остановленного SparkContext ".

Я отправляю заявку следующим образом:

/usr/hdp/current/spark2-client/bin/spark-submit \
  --class com.test.testone.kafkaStream.KafkaProducer \
  --master yarn \
  --deploy-mode client \
  --executor-memory 2G \
  --num-executors 4 \
  --jars /home/yeswanth/jars/spark-sql-kafka-0-10_2.11-2.3.0.jar,/home/yeswanth/jars/kafka-clients-1.0.0.jar \
  /home/yeswanth/jars/kafkaStreams-1.0-SNAPSHOT.jar

Пожалуйста, найдите код ниже.

val spark = SparkSession
  .builder
  .appName("Spark-Kafka-Integration")
  .master("local[1]")
  .getOrCreate()


val mySchema = StructType(Array(
  StructField("id", IntegerType),
  StructField("name", StringType),
  StructField("year", IntegerType),
  StructField("rating", DoubleType),
  StructField("duration", IntegerType)
))

val streamingDataFrame = spark.readStream.schema(mySchema).csv("Path")


streamingDataFrame.selectExpr("CAST(id AS STRING) AS key", "to_json(struct(*)) AS value")
  .writeStream
  .format("kafka")
  .option("topic", "topicname")
  .option("kafka.bootstrap.servers", "alpha:6667")
  .option("checkpointLocation","path")
  .outputMode("append")
  .start()

Трассировка стекаприведено ниже:

19/10/14 02:11:19 INFO FileStreamSource: maxFilesPerBatch = None, maxFileAgeMs = 604800000
19/10/14 02:11:19 ERROR MicroBatchExecution: Query [id = c6d99dc8-6dfd-4088-b1fa-7558c7ec8305, runId = 418bd5a2-c9fe-4d6c-8027-997a6a11e298] terminated with error
java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext.
This stopped SparkContext was created at:
org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:921)
com.test.testone.kafkaStream.KafkaProducer$.main(KafkaProducer.scala:18)
com.test.testone.kafkaStream.KafkaProducer.main(KafkaProducer.scala)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:498)
org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:906)
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:197)
org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:227)
org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:136)
org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)


The currently active SparkContext was created at:

org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:921)
com.test.testone.kafkaStream.KafkaProducer$.main(KafkaProducer.scala:18)
com.test.testone.kafkaStream.KafkaProducer.main(KafkaProducer.scala)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:498)
org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:906)
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:197)
org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:227)
org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:136)
org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
        at org.apache.spark.SparkContext.assertNotStopped(SparkContext.scala:99)
        at org.apache.spark.sql.SparkSession.<init>(SparkSession.scala:88)
        at org.apache.spark.sql.SparkSession.cloneSession(SparkSession.scala:252)
        at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:268)
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
19/10/14 02:11:19 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
19/10/14 02:11:19 INFO MemoryStore: MemoryStore cleared
19/10/14 02:11:19 INFO BlockManager: BlockManager stopped
19/10/14 02:11:19 INFO BlockManagerMaster: BlockManagerMaster stopped
19/10/14 02:11:19 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
19/10/14 02:11:19 INFO SparkContext: Successfully stopped SparkContext
19/10/14 02:11:19 INFO ShutdownHookManager: Shutdown hook called
19/10/14 02:11:19 INFO ShutdownHookManager: Deleting directory /tmp/spark-8a0488a0-77e0-43ee-a5d3-d50847a9531d
19/10/14 02:11:19 INFO ShutdownHookManager: Deleting directory /tmp/spark-2e344c85-4324-4563-93fa-f5a9392873bb```

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...