Потоки Kafka с идентичным идентификатором группы, использующие одну и ту же запись - PullRequest
0 голосов
/ 14 октября 2018

Мне нужно использовать записи из раздела Kafka в нескольких потоках с уникальными записями в каждом потоке для обработки.У меня следующий код, я не знаю, в чем была ошибка

public class ConsumerThread implements Runnable {
    public String name;
    public ConsumerThread(String name){
        this.name = name;
    }
    public Properties getDefaultProperty(){
        Properties prop = new Properties();
        prop.setProperty("group.id", "4");
        prop.put("enable.auto.commit", "false");
        prop.put("auto.offset.reset", "earliest");
        prop.setProperty("bootstrap.servers", "localhost:9092");
        prop.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        prop.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        prop.setProperty("max.poll.records","150");
        return prop;
    }
    public void run() {
        TopicPartition tp = new TopicPartition("my.topic", 0);
        KafkaConsumer consumer = new KafkaConsumer(getDefaultProperty());
        ArrayList tpList = new ArrayList<TopicPartition>();
        tpList.add(tp);
        consumer.assign(tpList);
        ConsumerRecords poll = consumer.poll(1000);
        Iterator it = poll.iterator();
        consumer.commitAsync();
        while(it.hasNext()){
            ConsumerRecord cr = (ConsumerRecord) it.next();
            System.out.println("From "+this.name+" : "+cr.value());
        }
        consumer.close();
        System.out.println("Thread Exiting "+this.name);
    }
}

Результат

From Thread1 : produced_0
From Thread1 : produced_1
From Thread1 : produced_2
From Thread1 : produced_3
.
.
.
From Thread1 : produced_136
From Thread2 : produced_0
From Thread2 : produced_1
From Thread2 : produced_2
From Thread2 : produced_3
.
.
.


Ожидается:

From Thread1 : produced_0
From Thread1 : produced_1
From Thread1 : produced_2
From Thread1 : produced_3
.
.
.
From Thread1 : produced_136
From Thread2 : produced_4
From Thread2 : produced_5
From Thread2 : produced_6
From Thread2 : produced_137

Ответы [ 2 ]

0 голосов
/ 14 октября 2018

Как сказал Лиор Чага в своем комментарии, вы вручную назначаете разделы тем вашему потребителю.Это не рекомендуемый способ сделать это.Кроме того, кажется, что все ваши потребители используют один и тот же точный идентификатор группы.В этой конфигурации с двумя потребляющими потоками, если хотя бы один из потребителей получил конкретное сообщение, none других потоков не получит это.Если вы хотите, чтобы все потребительские потоки каждый получали свой «набор» сообщений, не прерывая друг друга, то вам нужно дать им разные group.id s.

Чтобы подписаться на тему, чтобы онаобработайте для вас автобалансировку, а затем потребляйте, вы должны сделать что-то вроде этого (взято из javadoc KafkaConsumer, связанного ниже):

 consumer.subscribe(Arrays.asList("foo", "bar"));
 while (true) {
     ConsumerRecords<String, String> records = consumer.poll(100);
     for (ConsumerRecord<String, String> record : records)
         System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
 }

Официальные javadoc Kafka имеют гораздо более подробные объяснения: https://kafka.apache.org/20/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html

0 голосов
/ 14 октября 2018

Автоматическое присвоение разделов группе потребителей возможно только при использовании метода подписки для потребителя kafka.Однако вы используете assign с определенным тематическим разделом, поэтому вы берете на себя ответственность за назначение определенных разделов различным потребителям (но вы всегда используете один и тот же раздел 0, поэтому все потребители используют один и тот же тематический раздел).

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...