2 задания Spark Stream с идентичным идентификатором группы потребителей - PullRequest
0 голосов
/ 27 мая 2018

Я пытаюсь поэкспериментировать с группами потребителей

Вот мой фрагмент кода

public final class App {

private static final int INTERVAL = 5000;

public static void main(String[] args) throws Exception {

    Map<String, Object> kafkaParams = new HashMap<>();
    kafkaParams.put("bootstrap.servers", "xxx:9092");
    kafkaParams.put("key.deserializer", StringDeserializer.class);
    kafkaParams.put("value.deserializer", StringDeserializer.class);
    kafkaParams.put("auto.offset.reset", "earliest");
    kafkaParams.put("enable.auto.commit", true);
    kafkaParams.put("auto.commit.interval.ms","1000");
    kafkaParams.put("security.protocol","SASL_PLAINTEXT");
    kafkaParams.put("sasl.kerberos.service.name","kafka");
    kafkaParams.put("retries","3");
    kafkaParams.put(GROUP_ID_CONFIG,"mygroup");
    kafkaParams.put("request.timeout.ms","210000");
    kafkaParams.put("session.timeout.ms","180000");
    kafkaParams.put("heartbeat.interval.ms","3000");
    Collection<String> topics = Arrays.asList("venkat4");

    SparkConf conf = new SparkConf();
    JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(INTERVAL));


    final JavaInputDStream<ConsumerRecord<String, String>> stream =
            KafkaUtils.createDirectStream(
                    ssc,
                    LocationStrategies.PreferConsistent(),
                    ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
            );

    stream.mapToPair(
            new PairFunction<ConsumerRecord<String, String>, String, String>() {
                @Override
                public Tuple2<String, String> call(ConsumerRecord<String, String> record) {
                    return new Tuple2<>(record.key(), record.value());
                }
            }).print();


    ssc.start();
    ssc.awaitTermination();


}

}

Когда я запускаю два из этих заданий потоковой передачи одновременно, это не удаетсяс ошибкой

Исключение в потоке "main" java.lang.IllegalStateException: Нет текущего назначения для раздела venkat4-1 в org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState (SubscriptionState.Java: 251) в org.apache.kafka.clients.consumer.internals.SubscriptionState.needOffsetReset (SubscriptionState.java:315) в org.apache.kafka.clients.consumer.KafkaConsumer.seekToEnd (Kafjakuns70).apache.spark..DStream $$ anonfun $ getOrCompute $ 1 $$ anonfun $ 1 $$ anonfun $ apply $ 7.apply (DStream.scala: 341) в org.apache.spark.streaming.dstream.DStream $$ anonfun $ getOrCompute $ 1 $$ anonfun $ 1 $$ anonfun $ apply $ 7.apply (DStream.scala: 341) в scala.util.DynamicVariable.withValue (DynamicVariable.scala: 58) в org.apache.spark.streaming.dstream.DStream $$ anonfun $ getOrCompute $ 1 $$ anonfun $ 1.apply (DStream.scala: 340) в org.apache.spark.streaming.dstream.DStream $$ anonfun $ getOrCompute $ 1 $$ anonfun $ 1.apply (DStream.scala: 340) в org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties (DStream.scala: 415) в org.apache.spark.streaming.dstream.DStream $$ anonfun $ getOrCompute $ 1.apply (DStream.scala: 335) в org.apache.spark.streaming.dstream.DStream $$ anonfun $ getOrCompute $ 1.apply (DStream.scala: 333) в scala.Option.orElse (Option.scala: 289)

В соответствии с этим https://www.wisdomjobs.com/e-university/apache-kafka-tutorial-1342/apache-kafka-consumer-group-example-19004.html создание отдельного экземпляра потребителя кафки с той же группой приведет к перебалансированию разделов.Я считаю, что этот баланс не терпится потребителем.Как это исправить

Ниже приведена команда

SPARK_KAFKA_VERSION = 0.10 spark2-submit --num-executors 2 --master yarn --deploy-mode client --files jaas.conf# jaas.conf, hive.keytab # hive.keytab --driver-java-options "-Djava.security.auth.login.config =. / jaas.conf" --class Streaming.App --conf "spark.executor.extraJavaOptions = -Djava.security.auth.login.config =. / jaas.conf "--conf spark.streaming.kafka.consumer.cache.enabled = false 1-1.0-SNAPSHOT.jar

Ответы [ 2 ]

0 голосов
/ 02 июня 2018

@ Ravikumar Извиняюсь за задержку.

Мой тест был сделан следующим образом

a.В моей теме 3 раздела б.Работа с искровым потоком была начата с 2 исполнителями - и все работает отлично.с.Позже я решу увеличить его до другого экземпляра, запустив еще одно задание потоковой передачи с 1 исполнителем, чтобы соответствовать моим 3-м разделам, которые не удалось.

Относительно вашего утверждения: когда вы запускаете второе задание потоковой передачи, другой потребитель пытаетсяпотреблять один и тот же раздел из той же группы потребителей.Так что выдает ошибку да, это точно правильно.Но почему это не терпит, вот вопрос.

Цитируя выделенный вами документ:

Кафка назначает разделы темы потребителю в группе, так что каждый раздел являетсяпотребляется ровно одним потребителем в группе.Кафка гарантирует, что сообщение будет прочитано только одним потребителем в группе.Kafka перебалансирует хранилище разделов при сбое любого брокера или добавлении нового раздела в существующую тему.Это специфический для Кафки способ балансировки данных между разделами в брокерах.Kafka восстановит баланс, если будет добавлено больше процессов / потоков.ZooKeeper может быть перенастроен с помощью кластера Kafka, если какой-либо потребитель или брокер не сможет отправить пульс ZooKeeper.

Это то, что я ожидал и от своей работы с потоковым воспроизведением.Я пытался с клиентами нормальной кафки, которые были в состоянии терпеть баланс.

Ваша точка зрения из документа "Кэш имеет ключ topicpartition и group.id, поэтому используйте отдельный group.id для каждого вызова createDirectStream", прояснил мой вопрос.

В дополнение к PR https://github.com/apache/spark/pull/21038 - Следующие указания

«Разделы Kafka могут быть отменены, когда новые потребители объединяются в группу потребителей для изменения баланса разделовНо текущий код соединителя Spark Kafka обеспечивает отсутствие сценариев отмены разделов, поэтому при попытке получить последнее смещение от отмененных разделов будут возникать исключения, о которых упоминалось JIRA . "

Хорошо закрытьэта тема.Большое спасибо за ответ

0 голосов
/ 27 мая 2018

В соответствии с этим https://www.wisdomjobs.com/e-university/apache-kafka-tutorial-1342/apache-kafka-consumer-group-example-19004.html создание отдельного экземпляра потребителя kafka с той же группой приведет к перебалансированию разделов.Я считаю, что этот баланс не терпится потребителем.Как мне исправить это

enter image description here

Теперь все разделы используются только одним потребителем.Если скорость приема данных высока, потребитель может не спешить потреблять данные со скоростью загрузки.

enter image description here

Добавление большего количества потребителей в ту же группу потребителей для потребленияДанные из темы и увеличить уровень потребления.Spark Streaming с использованием этого подхода 1: 1 параллелизм между разделами Kafka и разделами Spark.Spark будет обрабатывать его внутренне.

Если число пользователей превышает количество разделов по темам, оно будет в состоянии простоя, а ресурсы используются не полностью.Всегда рекомендуется, чтобы потребитель был меньше или равен количеству разделов.

Kafka будет перебалансирован, если будет добавлено больше процессов / потоков.ZooKeeper может быть перенастроен с помощью кластера Kafka, если какой-либо потребитель или брокер не может отправить пульс ZooKeeper.

Kafka перебалансирует хранилище разделов при сбое любого брокера или добавлении нового раздела всуществующая тема.Это специфично для Кафки, как сбалансировать данные между разделами в посредниках.

Потоковая передача Spark обеспечивает простой параллелизм 1: 1 между разделами Kafka и разделами Spark.Если вы не предоставляете какие-либо сведения о разделах с помощью ConsumerStragies.Assign, использует из всех разделов данной темы.

Kafka назначает разделы темы потребителю в группе, так что каждый раздел потребляется ровно одним потребителем в группе.Kafka гарантирует, что сообщение будет когда-либо прочитано только одним потребителем в группе.

Когда вы запускаете второе задание потоковой передачи, другой потребитель пытается использовать тот же раздел у того же потребителя.GroupID.Так что выдает ошибку.

val alertTopics = Array("testtopic")

val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> sparkJobConfig.kafkaBrokers,
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "group.id" -> sparkJobConfig.kafkaConsumerGroup,
  "auto.offset.reset" -> "latest"
)

val streamContext = new StreamingContext(sparkContext, Seconds(sparkJobConfig.streamBatchInterval.toLong))

val streamData = KafkaUtils.createDirectStream(streamContext, PreferConsistent, Subscribe[String, String](alertTopics, kafkaParams))

Если вы хотите использовать задание искры для конкретного раздела, используйте следующий код:

val topicPartitionsList =  List(new TopicPartition("topic",1))

val alertReqStream1 = KafkaUtils.createDirectStream(streamContext, PreferConsistent, ConsumerStrategies.Assign(topicPartitionsList, kafkaParams))

https://spark.apache.org/docs/2.2.0/streaming-kafka-0-10-integration.html#consumerstrategies

Потребители могут присоединиться к группе с помощью samegroup.id.

val topicPartitionsList =  List(new TopicPartition("topic",3), new TopicPartition("topic",4))

    val alertReqStream2 = KafkaUtils.createDirectStream(streamContext, PreferConsistent, ConsumerStrategies.Assign(topicPartitionsList, kafkaParams))

Добавление еще двух потребителей - это добавление в один и тот же groupid.

Пожалуйста, прочитайте Spark-Kafkaруководство по интеграции.https://spark.apache.org/docs/2.2.0/streaming-kafka-0-10-integration.html

Надеюсь, это поможет.

...