Дубликаты при обработке потоковых данных с использованием Kafka-Spark Streaming API - PullRequest
0 голосов
/ 04 февраля 2019

Приведенный ниже код работает и фиксирует смещение после обработки данных.Но проблема в том, что он обрабатывает дубликаты в следующем случае:

Задание потребителя выполняется, и таблица кустов содержит 0 записей, а текущее смещение равно (FORMAT- fromOffest, tillOffset, Difference): 512 512 0

Затем я произвел 1000 записей, и к тому времени, когда он прочитал 34 записи, но не зафиксировал, я уничтожил ее 512 546 34

Я вижу, что к этому времени 34 записи уже были загруженык таблице Hive

Далее я перезапустил приложение.

Я вижу, что оно снова читает 34 записи (вместо чтения 1000-34 = 76 записей), хотя оно уже обработало их изагружен в Hive 512 1512 1000, а затем через несколько секунд он обновляется.1512 1512 0 Улей теперь имеет (34 + 1000 = 1034)

Это приводит к дублированию записей (дополнительно 34) в таблице.Как упоминалось в коде, я фиксирую смещение только после обработки / загрузки в таблицу Hive.

public void method1(SparkConf conf,String app) 
    spark = SparkSession.builder().appName(conf.get("")).enableHiveSupport().getOrCreate();
    final JavaStreamingContext javaStreamContext = new JavaStreamingContext(context,
            new Duration(<spark duration>));
    JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(javaStreamContext,
            LocationStrategies.PreferConsistent(),
            ConsumerStrategies.<String, String> Subscribe(<topicnames>, <kafka Params>));

            JavaDStream<String> records = messages.map(new Function<ConsumerRecord<String, String>, String>() {
                @Override
                public String call(ConsumerRecord<String, String> tuple2) throws Exception {
                    return tuple2.value();
                }
            });

            records.foreachRDD(new VoidFunction<JavaRDD<String>>() {
                @Override
                public void call(JavaRDD<String> rdd) throws Exception {
                    if(!rdd.isEmpty()) {
                        methodToSaveDataInHive(rdd, <StructTypeSchema>,<OtherParams>);
                    }
                }
             });

             messages.foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<String, String>>>() {
              @Override
              public void call(JavaRDD<ConsumerRecord<String, String>> rdd) {
                    OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
                    ((CanCommitOffsets) messages.inputDStream()).commitAsync(offsetRanges);                     
                    for (OffsetRange offset : offsetRanges) {
                        System.out.println(offset.fromOffset() + " " + offset.untilOffset()+ "  "+offset.count());
                    }
                     }
              });             
    javaStreamContext.start();
    javaStreamContext.awaitTermination();
}

1 Ответ

0 голосов
/ 15 февраля 2019

Вообще говоря, при создании задания Spark Streaming вам не следует беспокоиться о дубликатах, которые должны обрабатываться ниже по течению.Не поймите меня неправильно, вы хотите построить свое приложение для предотвращения дубликатов, но когда произойдут катастрофические вещи, вы получите дубликаты, поэтому лучше справиться с этим позже.

Первая проблема, которую я вижугде вы сохраняете свои смещения.Вы должны сохранять их сразу после сохранения данных, а не в методе после этого.Когда records.foreachRDD завершает methodToSaveData, он должен сделать вызов, чтобы сохранить смещение.Вероятно, вам потребуется изменить структуру отображения записей, чтобы у вас были детали смещения, но это лучшее место для этого.

        records.foreachRDD(new VoidFunction<JavaRDD<String>>() {
            @Override
            public void call(JavaRDD<String> rdd) throws Exception {
                if(!rdd.isEmpty()) {
                    methodToSaveDataInHive(rdd, <StructTypeSchema>,<OtherParams>);
                    **{commit offsets here}**
                }
            }
         });

Тем не менее, не имеет значения, где вы сохраняете смещения.Если задание будет уничтожено после того, как оно записало данные в куст и до того, как диапазон смещения будет зафиксирован, вы собираетесь повторно обработать записи.Существуют определенные способы создания приложения, чтобы оно имело изящные перехватчики завершения работы (Google it), которые пытаются перехватить команду kill и изящно закрыть ее, но опять же, это уязвимо для того, как приложение убивается или падает.Если машина, на которой работает исполнитель, теряет мощность после сохранения в куст, но до фиксации смещения, у вас есть дубликаты.Если приложение убито -9 (в Linux), оно не заботится о корректном завершении работы, и у вас будут дубликаты.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...