Я создал приложение с использованием структурированной потоковой передачи Apache Kafka и Apache Spark. У меня проблема ниже.
Сценарий :
- Я установил структурированный поток Spark с источником темы Kafka и
топить как кафку тему.
- Мы запускаем поток и выдаем несколько сообщений на Кафке
тема.
- Мы остановили поток и перезапустили поток, сняв контрольные точки
расположение потока. Через 5-6 часов работы поток
потребляет старые сообщения Кафки в случайном порядке.
После очистки местоположения контрольной точки я ожидал только новые сообщения в потоке.
Версия Spark: 2.4.0,
Кафка-версия клиента: 2.0.0,
Кафка версия: 2.0.0,
Менеджер кластера: Кубернетес.
Я пробовал этот сценарий, изменив местоположение контрольной точки, но проблема все еще сохраняется.
{
SparkConf sparkConf = new SparkConf().setAppName("SparkKafkaConsumer");
SparkSession spark = SparkSession.builder().config(sparkConf).getOrCreate();
Dataset<Row> stream = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option(subscribeType, "REQUEST_TOPIC")
.option("failOnDataLoss",false)
.option("maxOffsetsPerTrigger","50")
.option("startingOffsets","latest")
.load()
.selectExpr(
"CAST(value AS STRING) as payload",
"CAST(key AS STRING)",
"CAST(topic AS STRING)",
"CAST(partition AS STRING)",
"CAST(offset AS STRING)",
"CAST(timestamp AS STRING)",
"CAST(timestampType AS STRING)");
DataStreamWriter<String> dataWriterStream = stream
.writeStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("kafka.max.request.size", "35000000")
.option("kafka.retries", "5")
.option("kafka.batch.size", "35000000")
.option("kafka.receive.buffer.bytes", "200000000")
.option("kafka.acks","0")
.option("kafka.compression.type", "snappy")
.option("kafka.linger.ms", "0")
.option("kafka.buffer.memory", "50000000")
.option("topic", "RESPONSE_TOPIC")
.outputMode("append")
.option("checkpointLocation", checkPointDirectory);
spark.streams().awaitAnyTermination();
} * * тысяча двадцать-один