Старое смещение Кафки, потребляемое структурированной потоковой передачей после очистки местоположения контрольной точки - PullRequest
0 голосов
/ 17 января 2019

Я создал приложение с использованием структурированной потоковой передачи Apache Kafka и Apache Spark. У меня проблема ниже.

Сценарий :

  • Я установил структурированный поток Spark с источником темы Kafka и топить как кафку тему.
  • Мы запускаем поток и выдаем несколько сообщений на Кафке тема.
  • Мы остановили поток и перезапустили поток, сняв контрольные точки расположение потока. Через 5-6 часов работы поток потребляет старые сообщения Кафки в случайном порядке.

После очистки местоположения контрольной точки я ожидал только новые сообщения в потоке.
Версия Spark: 2.4.0, Кафка-версия клиента: 2.0.0, Кафка версия: 2.0.0, Менеджер кластера: Кубернетес.

Я пробовал этот сценарий, изменив местоположение контрольной точки, но проблема все еще сохраняется.

{
SparkConf sparkConf = new SparkConf().setAppName("SparkKafkaConsumer");
SparkSession spark = SparkSession.builder().config(sparkConf).getOrCreate();
Dataset<Row> stream = spark
        .readStream()
        .format("kafka")
            .option("kafka.bootstrap.servers", "localhost:9092")
            .option(subscribeType, "REQUEST_TOPIC")
            .option("failOnDataLoss",false)
            .option("maxOffsetsPerTrigger","50")
            .option("startingOffsets","latest")
            .load()
            .selectExpr(
                  "CAST(value AS STRING) as payload",
                  "CAST(key AS STRING)",
                  "CAST(topic AS STRING)",
                  "CAST(partition AS STRING)",
                  "CAST(offset AS STRING)",
                  "CAST(timestamp AS STRING)",
                  "CAST(timestampType AS STRING)");

DataStreamWriter<String>  dataWriterStream = stream
            .writeStream()
            .format("kafka")
            .option("kafka.bootstrap.servers", "localhost:9092")
            .option("kafka.max.request.size", "35000000")
            .option("kafka.retries", "5")
            .option("kafka.batch.size", "35000000")
            .option("kafka.receive.buffer.bytes", "200000000")
            .option("kafka.acks","0")
            .option("kafka.compression.type", "snappy")
            .option("kafka.linger.ms", "0")
            .option("kafka.buffer.memory", "50000000")
            .option("topic", "RESPONSE_TOPIC")
            .outputMode("append")
            .option("checkpointLocation", checkPointDirectory);
spark.streams().awaitAnyTermination();

} * * тысяча двадцать-один

1 Ответ

0 голосов
/ 08 февраля 2019

проверьте ссылку ниже,

https://jaceklaskowski.gitbooks.io/mastering-apache-spark/spark-rdd-checkpointing.html

Вы вызываете SparkContext.setCheckpointDir (directory: String), чтобы установить каталог контрольных точек - каталог, в котором проверяются RDD.Каталог должен быть путем HDFS, если он работает в кластере.Причина в том, что драйвер может попытаться восстановить RDD с контрольными точками из собственной локальной файловой системы, что неверно, поскольку файлы контрольных точек на самом деле находятся на компьютерах-исполнителях

...