Я запускаю свой структурированный потоковый код в пакетном режиме с интервалом в один час, и после нескольких пакетов (успешно завершенных) смещение изменяется на старое значение смещения и снова начинается чтение старого сообщения. В журнале я получаю предупреждение ниже.
WARN KafkaSource: смещение раздела topicName-29 было изменено с 1092271004 на 35623, некоторые данные могли быть пропущены. Некоторые данные могут быть потеряны, потому что их больше нет в Кафке; либо данные были устаревшими Kafka, либо topi c мог быть удален до обработки всех данных в topi c. Если вы хотите, чтобы в таких случаях ваш потоковый запрос не выполнялся, установите для параметра источника «failOnDataLoss» значение «true».
Build.sbt:
scalaVersion := "2.11.8"
val sparkVersion = "2.3.1"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % sparkVersion % "provided",
"org.apache.spark" %% "spark-sql" % sparkVersion % "provided",
"org.apache.kafka" % "kafka-clients" % "0.11.0.1",
"org.apache.spark" %% "spark-sql-kafka-0-10" % sparkVersion % "provided"
)
Код:
val readMessages=sparkSession
.readStream
.format("kafka")
.option("kafka.bootstrap.servers",BOOTSTRAP_SERVERS)
.option("subscribe", TOPIC_NAME)
.option("failOnDataLoss", "false")
.option("kafka.security.protocol","ssl")
.option("kafka.ssl.keystore.location",propertyMap(CONSUMER_KEYSTORE_LOCATION))
.option("kafka.ssl.keystore.password",propertyMap(CONSUMER_KEYSTORE_PASSWORD))
.option("kafka.ssl.truststore.location", propertyMap(CONSUMER_TRUSTSTORE_LOCATION))
.option("kafka.ssl.truststore.password", propertyMap(CONSUMER_TRUSTSTORE_PASSWORD))
.option("startingOffsets", "earliest")
.option("groupIdPrefix", "kafka-ingestion")
.load()
val streamingQuery=readMessages
.writeStream
.format("text"))
.trigger(Trigger.Once)
.option("path",propertyMap(TARGET_PATH))
.option("checkpointLocation",propertyMap(CHECKPOINT_PATH))
.outputMode("append")
.start()
streamingQuery.awaitTermination()
Я предполагаю, что когда любые инкрементные данные, доступные в kafka topi c, работают нормально. Если нет инкрементных данных, то смещение изменяется на более старое. и начать получать те же данные, которые уже обработаны.
Topi c - это компактный topi c со сроком хранения 7 дней.
Я пробовал с обоими initialOffsets как самые ранние и самые последние. Также попытался изменить failOnDataLoss , но он не работает должным образом.