Список тем Кафки через Spring-Kafka - PullRequest
0 голосов
/ 28 ноября 2018

Мы бы хотели перечислить все темы Kafka с помощью spring-kafka, чтобы получить результаты, аналогичные команде kafka:

bin/kafka-topics.sh --list --zookeeper localhost:2181

При запуске метода getTopics () в приведенном ниже сервисе мы получаем org.apache.kafka.common.errors.TimeoutException: истекло время ожидания при получении метаданных темы

Конфигурация:

@EnableKafka
@Configuration
public class KafkaConfig {
    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2181");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
            StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
            StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
}

Служба:

@Service
public class TopicServiceKafkaImpl implements TopicService {
    @Autowired
    private ConsumerFactory<String, String> consumerFactory;

    @Override
    public Set<String> getTopics() {
        try (Consumer<String, String> consumer = 
            consumerFactory.createConsumer()) {
            Map<String, List<PartitionInfo>> map = consumer.listTopics();
            return map.keySet();
    }
}

Кафказапущен, и мы можем успешно отправлять сообщения из нашего приложения в тему.

Ответы [ 3 ]

0 голосов
/ 29 ноября 2018

kafka-topics --list - это сценарий оболочки, который является просто оболочкой для класса kafka.admin.TopicCommand, где вы можете найти метод, который вы ищете

В качестве альтернативы, вы также можете использоватьAdminClient#listTopics метод

0 голосов
/ 30 июля 2019

Вы можете перечислить такие темы, используя Admin Client

    Properties properties = new Properties();
    properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

    AdminClient adminClient = AdminClient.create(properties);

    ListTopicsOptions listTopicsOptions = new ListTopicsOptions();
    listTopicsOptions.listInternal(true);

    System.out.println("topics:" + adminClient.listTopics(listTopicsOptions).names().get());
0 голосов
/ 29 ноября 2018

Вы подключаетесь к Zookeeper (2181) вместо Kafka (по умолчанию 9092).

Клиенты Java kafka больше не общаются напрямую с ZK.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...