Как я могу сообщить метрики Kafka Producer в Prometheus (с помощью весенней загрузки) - PullRequest
4 голосов
/ 30 апреля 2019

Я работаю с пружинной интеграцией для передачи данных от конечной точки UDP к кафке. Я инициализировал replyingKafkaTemplate как @Bean в @Configuration с конфигурациями как потребителя, так и производителя. Когда мой сервер включен и после отправки некоторых запросов udp я могу видеть показатели потребителя. Тем не менее, я не вижу метрики производителя, даже после установки репортера jmx в конфигурации производителя.

Я пытался не устанавливать репортер метрик производителя, предполагая, что он будет автоматически отображаться как метрики потребителя (без дополнительной настройки)

конфигурация производителя

Map<String, Object> configProps = new HashMap<>();
        configProps.put(
                ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                bootstrapAddress);
        configProps.put(
                ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class);
        configProps.put(
                ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                KafkaAvroSerializer.class);
        configProps.put("schema.registry.url", "http://schema-regisry-server:8081");
        configProps.put(
                ProducerConfig.RETRIES_CONFIG,
                3);
        configProps.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, 500);
        configProps.put(ProducerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG, 5000);
        configProps.put(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, "org.apache.kafka.common.metrics.JmxReporter");
        configProps.put(ProducerConfig.METRICS_RECORDING_LEVEL_CONFIG, "INFO");

        printConfigProps(configProps);
        return new DefaultKafkaProducerFactory<>(configProps);

потребительская конфигурация

Map<String, Object> properties = new HashMap<>();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
        properties.put("schema.registry.url", "http://schema-regisry-server:8081");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "spring-integration");
        // automatically reset the offset to the earliest offset
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        return properties;

Создание шаблона kafka

@Bean
    public ReplyingKafkaTemplate<String, DataModel, DataModel> replyKafkaTemplate(ProducerFactory<String, DataModel> pf, KafkaMessageListenerContainer<String, DataModel> container) {
        ReplyingKafkaTemplate<String, DataModel, DataModel> template = new ReplyingKafkaTemplate<>(pf, container);
        template.start();
        return template;
    }

Создание контейнера слушателя:

@Bean
    public KafkaMessageListenerContainer<String, DataModel> replyContainer(ConsumerFactory<String, DataModel> cf) {
        ContainerProperties containerProperties = new ContainerProperties(destinationTopic);
        containerProperties.setGroupId("test");
        return new KafkaMessageListenerContainer<>(cf, containerProperties);
    }

Создание ConsumerFactory

@Bean
    public ConsumerFactory<?, ?> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...