Kafka Streams Предупреждение о неподписанных темах - PullRequest
0 голосов
/ 21 января 2020

В моем приложении kafka streams я подписался на список тем, скажем так (Topic1, Topic2, Topic3). В целом, у брокера есть более 3 тем (100), и другие потоковые приложения используют эти темы.

В моем приложении (которое использует Topic1, Topic2 и Topic3) я продолжаю видеть ПРЕДУПРЕЖДЕНИЕ, относящееся к другим темам. также.

14:02:33.910 478517 [Application-468d913c-bca0-4ad9-baec-5b51ffa597d2-StreamThread-4] WARN  o.a.kafka.clients.NetworkClient - [Consumer clientId=Application-468d913c-bca0-4ad9-baec-5b51ffa597d2-StreamThread-4-consumer, groupId=Application] 1389 partitions have leader brokers without a matching listener, including [Topic4, Topic5, Topic6, Topic7, ..., Topic8-Link-Store-changelog-5 ]

сообщение печатается следующим кодом kafka https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java

@Override
        public void handleSuccessfulResponse(RequestHeader requestHeader, long now, MetadataResponse response) {
            // If any partition has leader with missing listeners, log up to ten of these partitions
            // for diagnosing broker configuration issues.
            // This could be a transient issue if listeners were added dynamically to brokers.
            List<TopicPartition> missingListenerPartitions = response.topicMetadata().stream().flatMap(topicMetadata ->
                topicMetadata.partitionMetadata().stream()
                    .filter(partitionMetadata -> partitionMetadata.error() == Errors.LISTENER_NOT_FOUND)
                    .map(partitionMetadata -> new TopicPartition(topicMetadata.topic(), partitionMetadata.partition())))
                .collect(Collectors.toList());
            if (!missingListenerPartitions.isEmpty()) {
                int count = missingListenerPartitions.size();
                log.warn("{} partitions have leader brokers without a matching listener, including {}",
                        count, missingListenerPartitions.subList(0, Math.min(10, count)));
            }

Ожидается ли это? Почему эти предупреждающие сообщения размещены в моем приложении?

1 Ответ

0 голосов
/ 21 января 2020

Примечание

Это может быть временной проблемой, если слушатели динамически добавляются к брокерам

В любом случае это будет указывать на проблему со свойством listeners определены на брокеров.

Например, вы подключились к PLAINTEXT://kafka:9092, но сервер фактически запускается с PLAINTEXT_SASL://

...