Spark Streaming в Java: чтение из двух тем Kafka с использованием одного потребителя с использованием JavaInputDStream - PullRequest
0 голосов
/ 19 марта 2020

У меня есть приложение spark, которое требуется для чтения из двух разных тем, используя одного потребителя, используя Spark Java. Схема ключа и значения сообщения kafka одинакова для обеих тем.

Ниже приведен рабочий процесс:

1. Read messages from both the topics, same groupID, using JavaInputDStream<ConsumerRecord<String, String>> and iterate using foreachRDD
2. Inside the loop, Read offsets, filter messages based on the message key and create JavaRDD<String>
3. Iterate on JavaRDD<String> using mapPartitions
4. Inside mapPartitions loop, iterate over them using forEachRemaining.
5. Perform data enrichment, transformation, etc on the rows inside forEachRemaining loop.
6. commit 

Я хочу понять приведенные ниже вопросы. Пожалуйста, предоставьте свои ответы или поделитесь любой документацией, которая может помочь мне найти ответы.

1. How the messages are received/consumed from two topics(one common group id, same schema both key/value) in one consumer.
Let say the consumer reads data every second. Producer1 produces 50 messages to Topic1 and Producer 2 produces 1000 messages to Topic2.
2. Is it going to read all msgs(1000+50) in one batch and process together in the workflow, OR is it going to read 50 msgs first, process them and then read 1000 msgs and process them.
3. What parameter should i use to control the number of messages being read in one batch per second.
4. Will same group id create any issue while consuming.

Ответы [ 2 ]

0 голосов
/ 24 марта 2020

1. Как сообщения принимаются / потребляются из двух тем (один общий идентификатор группы, одна и та же схема и ключ / значение) в одном потребителе. Допустим, потребитель читает данные каждую секунду. Producer1 создает 50 сообщений для Topic1, а Producer 2 создает 1000 сообщений для Topic2.
Любой потребитель Kafka может упомянуть список тем, поэтому никаких ограничений по этому поводу нет.
Так что если у вас есть один потребитель, он будет отвечает за все разделы как Topic1, так и Topic2.
2. Будет ли он читать все сообщения (1000 + 50) в одном пакете и обрабатывать их вместе в рабочем процессе, ИЛИ собирается ли сначала прочитать 50 сообщений, обработать их, а затем прочитать 1000 сообщений и обработать их?
3. Какой параметр я должен использовать для контроля количества сообщений, читаемых в одном пакете в секунду.
Ответ на оба 2,3 вопроса:
Он получит все сообщения вместе (1050) или даже больше, в зависимости от вашей конфигурации.
Чтобы потребитель мог получать пакетами по 1050 или более, увеличьте max.poll.records (по умолчанию 500) до 1050 (или более); другие конфигурации могут быть узким местом, но вы должны быть согласны с остальными для конфигураций по умолчанию.
4. Будет ли тот же идентификатор группы создавать какие-либо проблемы при потреблении.
Тот же идентификатор группы будет влиять на вас, если вы создадите более одного потребителя, заставляя потребителей разделять разделы, за которые они отвечают, между темами.
Кроме того, если ваш потребитель умирает или останавливается по какой-то причине, вы должны восстановить его с тем же идентификатором группы, таким образом, потребитель «запоминает» последнее использованное смещение и удерживается от точек, которые он остановил.

Если у вас возникнут какие-либо проблемы в отношении вашего потребителя, я предлагаю вам прочитать дополнительную информацию в этой статье , это глава 4 от Kafka: «Полное руководство», в которой подробно рассказывается о потребителях и следует ответить на дополнительные вопросы.
Если вы хотите изучить параметры конфигурации, документация всегда полезна.

0 голосов
/ 24 марта 2020

Официальный документ в Spark Streaming уже объясняет, как использовать несколько тем для идентификатора группы. https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html

Collection<String> topics = Arrays.asList("topicA", "topicB");

JavaInputDStream<ConsumerRecord<String, String>> stream =
  KafkaUtils.createDirectStream(
    streamingContext,
    LocationStrategies.PreferConsistent(),
    ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
  );
  1. Один идентификатор группы и одна и та же схема для обеих тем.
  2. Не уверен в этом, однако, насколько я понимаю, это будет потреблять все сообщения в зависимости от размера пакета.
  3. "spark.streaming.backpressure.enabled" установите это значение как истинное, а "spark.streaming.kafka.maxRatePerPartition" установите его как значение цифры c на основе на этом искра ограничивает количество сообщений, потребляемых от кафки за партию. Также установите продолжительность партии соответственно. https://spark.apache.org/docs/latest/api/java/index.html?org / apache / spark / streaming / api / java / JavaStreamingContext. html
  4. Это полностью зависит от использования вашего приложения.
...