Я наблюдаю странное поведение во время работы программы потоковой передачи с искрой. Я использую S3 Bucket для контрольных точек метаданных. В разделе kafka 310 разделов.
Когда я запускаю потоковое задание в первый раз, после завершения каждой пакетной искры создается новый файл с именем batch_id, созданным в каталоге смещения в месте контрольной печати. После успешного завершения нескольких пакетов задание запуска завершается неудачно после нескольких повторных попыток с предупреждением "WARN KafkaMicroBatchReader: 66 - Set (logs-2019-10-04-77, logs-2019-10-04-85, logs-2019-10-04-71, бревна-2019-10-04-93, бревна-2019-10-04-97, бревна-2019-10-04-101, бревна-2019-10-04-89, бревна-2019-10-04-81, бревна-2019-10-04-103, бревна-2019-10-04-104, бревна-2019-10-04-102, бревна-2019-10-04-98, бревна-2019-10-04-94, бревна-2019-10-04-90, бревна-2019-10-04-74, бревна-2019-10-04-78, бревна-2019-10-04-82, бревна-2019-10-04-86, бревна-2019-10-04-99, бревна-2019-10-04-91, бревна-2019-10-04-73, бревна-2019-10-04-79, бревна-2019-10-04-87, бревна-2019-10-04-83, бревна-2019-10-04-75, бревна-2019-10-04-92, бревна-2019-10-04-70, бревна-2019-10-04-96, бревна-2019-10-04-88, бревна-2019-10-04-95, бревна-2019-10-04-100, бревна-2019-10-04-72, бревна-2019-10-04-76, logs-2019-10-04-84, logs-2019-10-04-80). Возможно, некоторые данные были пропущены. Некоторые данные могут быть потеряны, так как их больше нет в Kafka;данные были устарели Кафкой илиВозможно, тема была удалена до обработки всех данных в теме. Если вы хотите, чтобы в таких случаях ваш потоковый запрос не выполнялся, установите для параметра источника «failOnDataLoss» значение «false». "
Здесь надо отметить, что файл смещения предыдущего пакета содержит информацию о разделах всех 310 разделов, но текущий пакетчитает только выбранные разделы (см. предупреждение выше). Я перезапустил задание, установив «.option (« failOnDataLoss », false)», но получил то же предупреждение выше без сбоя задания. Было замечено, что искра обрабатывает корректные смещения для нескольких разделови для остальных разделов он считывал из начального смещения (0). При возникновении этой ошибки не было проблем с подключением к spark-kafka (мы также проверяли журналы kafka).
Может ли кто-нибудь помочь с этим? я делаю что-то не так или что-то упустил?
Ниже приведен фрагмент кода чтения и записи.
val kafkaDF = ss.readStream.format("kafka")
.option("kafka.bootstrap.servers", kafkaBrokers /*"localhost:9092"*/)
.option("subscribe", logs)
.option("fetchOffset.numRetries",5)
.option("maxOffsetsPerTrigger", 30000000)
.load()
val query = logDS
.writeStream
.foreachBatch {
(batchDS: Dataset[Row], batchId: Long) =>
batchDS.repartition(noofpartitions, batchDS.col("abc"), batchDS.col("xyz")).write.mode(SaveMode.Append).partitionBy("date", "abc", "xyz").format("parquet").saveAsTable(hiveTableName /*"default.logs"*/)
}
.trigger(Trigger.ProcessingTime(1800 + " seconds"))
.option("checkpointLocation", s3bucketpath)
.start()
Заранее спасибо.