Кафка слушатель не слушает Кафку - PullRequest
2 голосов
/ 09 июля 2020

Я использую Java 11 и kafka-client 2.0.0.

Я использую следующий код для создания потребителя:

    public Consumer createConsumer(Properties properties,String regex) {
        log.info("Creating consumer and listener..");
        Consumer consumer = new KafkaConsumer<>(properties);
        ConsumerRebalanceListener listener = new ConsumerRebalanceListener() {

            @Override
            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                log.info("The following partitions were revoked from consumer : {}", Arrays.toString(partitions.toArray()));
            }

            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                log.info("The following partitions were assigned to consumer : {}", Arrays.toString(partitions.toArray()));
            }
        };
        consumer.subscribe(Pattern.compile(regex), listener);
        log.info("consumer subscribed");
        return consumer;
    }
}

Мой опрос l oop находится в другом месте кода:

public <K, V> void startWorking(Consumer<K, V> consumer) {
        try {
            while (true) {
                ConsumerRecords<K, V> records = consumer.poll(600);
                if (records.count() > 0) {
                    log.info("Polled {} records", records.count());

                } else {
                    log.info("polled 0 records.. going to sleep..");
                    Thread.sleep(200);
                }
            }
        } catch (WakeupException | InterruptedException e) {
            log.error("Consumer is shutting down", e);
        } finally {
            consumer.close();
        }
    }

Когда я запускаю код и использую эту функцию, создается потребитель, и журнал содержит следующие сообщения:

    Creating consumer and listener..
    consumer subscribed
polled 0 records.. going to sleep..
polled 0 records.. going to sleep..
polled 0 records.. going to sleep..

Журнал не содержит информации о назначении / отзыве раздела.

Кроме того, я могу видеть в журнале свойства, которые использует потребитель (установлен group.id):

2020-07-09 14:31:07.959 DEBUG 7342 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values:
        auto.commit.interval.ms = 5000
        auto.offset.reset = latest
        bootstrap.servers = [server1:9092]
        check.crcs = true
        client.id =
        group.id=mygroupid
        key.deserializer=..
        value.deserializer=..

Итак, я попытался использовать kafka-console-consumer с той же конфигурацией, чтобы использовать одну из тем, которые регулярное выражение (mytopi c. *) Должно улавливать (в этом случае я использовал topi c mytopi c -1):

/usr/bin/kafka-console-consumer.sh --bootstrap-server server1:9092 --topic mytopic-1 --property print.timestamp=true --consumer.config /data/scripts/kafka-consumer.properties  --from-begining

У меня есть опрос l oop в другой части моего кода, который отключается каждые 10 м. Итак, нижняя строка - проблема в эти разделы не назначены потребителю Java. Отпечатков внутри слушателя никогда не происходит, и у потребителя нет разделов для прослушивания.

1 Ответ

0 голосов
/ 14 июля 2020

Кажется, мне не хватало свойства ssl в моем файле свойств. Не забудьте указать security.protocol=ssl, если вы используете ssl. Кажется, что API-интерфейс kafka-client не генерирует исключение, если Kafka использует ssl, а вы пытаетесь получить к нему доступ без настроенного параметра ssl.

...