Это кажется очень простой реализацией, но похоже, что есть некоторые проблемы.
Это задание считывает смещения (данные событий пользовательского интерфейса) из темы kafka, выполняет некоторую агрегацию и записывает ее в базу данных Aerospike.
В случае высокого трафика я начинаю видеть эту проблему, когда задание работает нормально, но новые данные не вставляются.Глядя на логи, я вижу это ПРЕДУПРЕЖДАЮЩИЕ сообщения:
Текущая партия отстает.Интервал запуска составляет 30000 миллисекунд, но затрачено 43491 миллисекунда
Несколько раз задание возобновляет запись данных, но я вижу, что число отсчетов низкое, что указывает на некоторую потерю данных.
Здесьэто код:
Dataset<Row> stream = sparkSession.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", kafkaBootstrapServersString)
.option("subscribe", newTopic)
.option("startingOffsets", "latest")
.option("enable.auto.commit", false)
.option("failOnDataLoss", false)
.load();
StreamingQuery query = stream
.writeStream()
.option("startingOffsets", "earliest")
.outputMode(OutputMode.Append())
.foreach(sink)
.trigger(Trigger.ProcessingTime(triggerInterval))
.queryName(queryName)
.start();