Структурированная потоковая передача + Кафка: не удается отправить после закрытия производителя - PullRequest
0 голосов
/ 10 октября 2018

Структура потоковой передачи Sparks записывает данные в Kafka, выдавая исключение:

Невозможно отправить после закрытия производителя.

val spark = SparkSession
  .builder()
  .appName("stream source 30361")
  .getOrCreate()

val eventSource_HS_OPT_EWB = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS)
  .option("subscribe", TOPIC_HS_OPT_EWB)
  .option("maxOffsetsPerTrigger", 10000)
  .option("startingOffsets", "latest")
  .option("failOnDataLoss", false) //[2018-07-13]
  .load()
  .select(from_json(col("value").cast("string"), schema_HS_OPT_EWB).alias("parsed_value"))
  .selectExpr("parsed_value.*")

eventSource_HS_OPT_EWB.createOrReplaceTempView("hsOptEwb_tl")

val exp_bol: DataFrame = spark.sql("select * from hsOptEwb_tl")

var query_exp_bol: StreamingQuery = exp_bol.selectExpr("to_json(struct(*)) AS value")
  .writeStream
  .outputMode("append")
  .format("kafka") // can be "orc", "json", "csv",memory,console etc.
  .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS)
  .option("topic", TOPIC_EVENT_WAYBILL)
  .option("checkpointLocation", CHECKPOINT_PATH_HS_OPT_EWB)
  .option("kafka.timeout.ms", 120000)
  .option("kafka.request.timeout.ms", 120000)
  .option("kafka.session.timeout.ms", 180000)
  .option("kafka.heartbeat.interval.ms", 60000)
  .option("kafka.retries", 10)
  .option("kafka.max.request.size", 10485760) //134217728//209715200
  .option("kafka.buffer.memory", 10485760)
  .start()
spark.streams.awaitAnyTermination()

kafka: 0,10.1
spark: 2.2
тема: 10 разделов

...