Многопоточный Kafka Consumer не обрабатывает все разделы параллельно - PullRequest
0 голосов
/ 01 марта 2019

Я создал многопоточный потребитель Kafka, в котором каждому разделу назначен один поток (у меня всего 100 разделов).Я перешел по ссылке https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example.

Ниже приведен метод init моего потребителя.

consumer =  kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig());
        System.out.println("Kafka Consumer initialized.");
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topicName, 100);
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);

        List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topicName);

        executor = Executors.newFixedThreadPool(100);

В приведенном выше методе init я получил список потоков Kafka (всего100), который должен быть подключен к каждому разделу (что происходит, как и ожидалось).

Затем я отправил каждый из потоков в другой поток, используя приведенный ниже фрагмент.

public object call() {

  for (final KafkaStream stream : streams) {
        executor.execute(new StreamWiseConsumer(stream));
    }
    return true;
  }

Ниже приведен класс StreamWiseConsumer.

public class StreamWiseConsumer extends Thread {

    ConsumerIterator<byte[], byte[]> consumerIterator;
    private KafkaStream m_stream;

    public StreamWiseConsumer(ConsumerIterator<byte[], byte[]> consumerIterator) {
        this.consumerIterator = consumerIterator;
    }

    public StreamWiseConsumer(KafkaStream kafkaStream) {
        this.m_stream = kafkaStream;
    }


    @Override
    public void run() {
        ConsumerIterator<byte[], byte[]> consumerIterator  = m_stream.iterator();

        while(!Thread.currentThread().isInterrupted() && !interrupted) {
            try {
                if (consumerIterator.hasNext()) {
                    String reqId = UUID.randomUUID().toString();
                    System.out.println(reqId+ " : Event received by threadId : "+Thread.currentThread().getId());
                    MessageAndMetadata<byte[], byte[]> messageAndMetaData = consumerIterator.next();
                    byte[] keyBytes = messageAndMetaData.key();
                    String key = null;
                    if (keyBytes != null) {
                        key = new String(keyBytes);
                    }
                    byte[] eventBytes = messageAndMetaData.message();
                    if (eventBytes == null){
                        System.out.println("Topic: No event fetched for transaction Id:" + key);
                        continue;
                    }
                    String event = new String(eventBytes).trim();
                    // Some Processing code
                    System.out.println(reqId+" : Processing completed for threadId = "+Thread.currentThread().getId());
                    consumer.commitOffsets();
            } catch (Exception ex) {

            }

        }
    }
}

В идеале он должен начинать обработку со всех 100 разделов параллельно.Но он выбирает случайное число событий из одного из потоков и обрабатывает его, а затем другой поток начинает обработку из другого раздела.Кажется, что это последовательная обработка, но с разными потоками.Я ожидал, что обработка произойдет из всех 100 потоков.Я что-то здесь упускаю?

PFB для ссылки логов.https://drive.google.com/file/d/14b7gqPmwUrzUWewsdhnW8q01T_cQ30ES/view?usp=sharing https://drive.google.com/file/d/1PO_IEsOJFQuerW0y-M9wRUB-1YJuewhF/view?usp=sharing

1 Ответ

0 голосов
/ 01 марта 2019

Я сомневаюсь, что это правильный подход для вертикального масштабирования потоков kafka.

Потоки Kafka по своей природе поддерживают многопоточное потребление.

Увеличение числа потоков, используемых для обработки, с помощью num.streamКонфигурация

...