Невозможно зафиксировать смещение вручную в прямом потоке kafka, потоковая передача Spark - PullRequest
1 голос
/ 19 октября 2019

Я пытаюсь проверить работу ручной фиксации смещения.

Когда я пытаюсь выйти из задания либо с помощью thread.sleep () / jssc.stop () /, генерируя исключения в цикле while,Я вижу, что смещения совершаются.

Я просто отправляю пару сообщений для тестирования, но я вижу 0 лагов, как только задание начинает обрабатывать пакет.

Когда Spark фактически фиксирует смещения?

JavaInputDStream<ConsumerRecord<String, String>> kafkaStream = KafkaUtils.createDirectStream(jssc,
                    LocationStrategies.PreferConsistent(),
                    ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams));

    kafkaStream.foreachRDD(kafkaStreamRDD -> {
                // fetch kafka offsets for manually committing it later
                OffsetRange[] offsetRanges = ((HasOffsetRanges) kafkaStreamRDD.rdd()).offsetRanges();


                // filter unwanted data
                kafkaStreamRDD.filter(new Function<ConsumerRecord<String, String>, Boolean>() {

                    //filter logic here

                }).foreachPartition(kafkaRecords -> {

                    //Initializing DB connections

                    while (kafkaRecords.hasNext()) {

                        //doing some work here

                        //-----> EXCEPTION
                        throw new Exception();
                    }

                });
                // commit offsets saveOffsets after processing
            ((CanCommitOffsets) kafkaStream.inputDStream()).commitAsync(offsetRanges, (offsets, exception) -> {
                if (exception != null) {
                    System.out.println("-------------Unable to commit offsets, something went wrong, trace ------------"+ exception.getCause());
                    exception.printStackTrace(); // need this for driver
                } else {
                    System.out.println("Successfully committed offsets"); // need this for driver
                    for (OffsetRange offsetRange : offsetRanges) {
                        System.out.println("Offset Info: paratition {}, fromOffset {} untilOffset {}: "+ offsetRange.partition() +":"+ offsetRange.fromOffset() +":"+ offsetRange.untilOffset());
                    }
                }
            });

enable.auto.commit: false

Соблюдать throw new Exception(); в цикле while. Даже если пакет завершится неудачно из-за исключения, я вижу зафиксированное смещение, я ожидаю некоторой задержки здесь, поскольку обработка не удалась, что здесь не так?

Ответы [ 2 ]

1 голос
/ 19 октября 2019

Прелесть структурированного потока Spark на Kafka заключается в том, что он обеспечивает ручное смещение, недоступное в Kafka Stream. Spark Stram commit является поточно-ориентированным, асинхронным по своей природе, и, поскольку Kafka не является транзакционным, ваши выводы должны быть идемпотентными. Это означает, что когда вы начинаете использовать сообщение, ваше смещение продолжает увеличиваться, тогда как коммит может появиться позже. Как и в случае с HasOffsetRanges, приведение к CanCommitOffsets будет успешным только в том случае, если оно вызвано результатом createDirectStream, а не после преобразований. Вызов commitAsync является потокобезопасным, но должен произойти после выходных данных.

Вы можете проверить, как выполняется ваш коммит, используя обратный вызов, как показано ниже

stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges, new OffsetCommitCallback() {
            def onComplete(m: java.util.Map[TopicPartition, OffsetAndMetadata], e: Exception) {
              m.foreach(f => {
                if (null != e) {
                  logger.info("Failed to cmomit:" + f._1 + "," + f._2)
                  logger.info("Error while commitAsync. Retry again"+e.toString)
                  stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
                } else {
                  println("Offset commit:" + f._1 + "," + f._2)
                }
              })
            }
          })
0 голосов
/ 25 октября 2019

В случае исключений на рабочем узле задание повторно отправляется с максимальным значением spark.task.maxFailures (число сбоев в любой конкретной задаче, прежде чем отказаться от задания). Смещения фиксируются после обработки пакета Dstream. Вы должны обработать исключение (запись в журнал ошибок или пересылку записи в DLQ) в зависимости от вашего варианта использования.

...