Мы бы хотели перечислить все темы Kafka с помощью spring-kafka, чтобы получить результаты, аналогичные команде kafka:
bin/kafka-topics.sh --list --zookeeper localhost:2181
При запуске метода getTopics () в приведенном ниже сервисе мы получаем org.apache.kafka.common.errors.TimeoutException: истекло время ожидания при получении метаданных темы
Конфигурация:
@EnableKafka
@Configuration
public class KafkaConfig {
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2181");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
Служба:
@Service
public class TopicServiceKafkaImpl implements TopicService {
@Autowired
private ConsumerFactory<String, String> consumerFactory;
@Override
public Set<String> getTopics() {
try (Consumer<String, String> consumer =
consumerFactory.createConsumer()) {
Map<String, List<PartitionInfo>> map = consumer.listTopics();
return map.keySet();
}
}
Кафказапущен, и мы можем успешно отправлять сообщения из нашего приложения в тему.