Исключительная структурированная потоковая передача не работает с темой сжатия журнала kafka - PullRequest
2 голосов
/ 14 марта 2020

Я запускаю свой структурированный потоковый код в пакетном режиме с интервалом в один час, и после нескольких пакетов (успешно завершенных) смещение изменяется на старое значение смещения и снова начинается чтение старого сообщения. В журнале я получаю предупреждение ниже.

WARN KafkaSource: смещение раздела topicName-29 было изменено с 1092271004 на 35623, некоторые данные могли быть пропущены. Некоторые данные могут быть потеряны, потому что их больше нет в Кафке; либо данные были устаревшими Kafka, либо topi c мог быть удален до обработки всех данных в topi c. Если вы хотите, чтобы в таких случаях ваш потоковый запрос не выполнялся, установите для параметра источника «failOnDataLoss» значение «true».

Build.sbt:

scalaVersion := "2.11.8"
val sparkVersion = "2.3.1"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % sparkVersion % "provided",
"org.apache.spark" %% "spark-sql" % sparkVersion % "provided",
"org.apache.kafka" % "kafka-clients" % "0.11.0.1",
"org.apache.spark" %% "spark-sql-kafka-0-10" % sparkVersion % "provided"
)

Код:

val readMessages=sparkSession
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers",BOOTSTRAP_SERVERS)
      .option("subscribe", TOPIC_NAME)
      .option("failOnDataLoss", "false")
      .option("kafka.security.protocol","ssl")
      .option("kafka.ssl.keystore.location",propertyMap(CONSUMER_KEYSTORE_LOCATION))
      .option("kafka.ssl.keystore.password",propertyMap(CONSUMER_KEYSTORE_PASSWORD))
      .option("kafka.ssl.truststore.location", propertyMap(CONSUMER_TRUSTSTORE_LOCATION))
      .option("kafka.ssl.truststore.password", propertyMap(CONSUMER_TRUSTSTORE_PASSWORD))
      .option("startingOffsets", "earliest")
      .option("groupIdPrefix", "kafka-ingestion")
      .load()


val streamingQuery=readMessages
    .writeStream
    .format("text"))
    .trigger(Trigger.Once)
    .option("path",propertyMap(TARGET_PATH))
    .option("checkpointLocation",propertyMap(CHECKPOINT_PATH))
    .outputMode("append")
    .start()

streamingQuery.awaitTermination()

Я предполагаю, что когда любые инкрементные данные, доступные в kafka topi c, работают нормально. Если нет инкрементных данных, то смещение изменяется на более старое. и начать получать те же данные, которые уже обработаны.

Topi c - это компактный topi c со сроком хранения 7 дней.

Я пробовал с обоими initialOffsets как самые ранние и самые последние. Также попытался изменить failOnDataLoss , но он не работает должным образом.

...