Повторяющиеся потребительские метрики, если для параллелизма установлено более 1 - PullRequest
0 голосов
/ 18 июня 2020
• 1000 А затем я пытаюсь захватить показатели потребителя, чтобы понять, насколько быстро мой потребитель может потреблять и обрабатывать записи.

Например, я создал 50 тыс. Сообщений на topi c и установил для параллелизма значение 5 , добавляя попытку приостановить потоки потребителей после того, как они потребили 10 тыс. записей. Теперь, когда я пытаюсь зафиксировать потребительские метрики, я получаю повторяющиеся значения и не уверен, какое из них правильное. Итак, может ли кто-нибудь помочь мне понять, как я могу получить правильные показатели моего потребителя?

Вот мой код потребителя

public static ThreadLocal<Integer> consumedMessages = new ThreadLocal<Integer>() {
        @Override
        protected Integer initialValue() {
            return Integer.valueOf(0);
        }
    };

    public static List<ConsumerMetrics> mtpConsumerMetricsList = new ArrayList<>();

    int messageCount =10000;


    @KafkaListener(id = "myConsumer", topics = "myTopic", concurrency = 5)
    public void listen(String in) {
        try {
            consumedMessages.set(consumedMessages.get().intValue() + 1);
            System.out.println(" ConsumerConsumedMessages " + consumedMessages.get());



            if (consumedMessages.get() == messageCount) {
                System.out.println("ATTENTION! ATTENTION! ATTENTION! Consumer Finished processing " + messageCount + " messages");

                ConcurrentMessageListenerContainer concurrentMsgLstnrCntnr = (ConcurrentMessageListenerContainer) this.registry.getListenerContainer("myConsumer");

                if (concurrentMsgLstnrCntnr != null) {
                    concurrentMsgLstnrCntnr.pause();
                    System.out.println("Pausing of mtpConsumer is done at " + LocalDateTime.now());

                    List<KafkaMessageListenerContainer> list = concurrentMsgLstnrCntnr.getContainers();

                    System.out.println("Containers list size "+list.size());

                    List<ConsumerMetrics> consumerMetricsList = new ArrayList<>();

                    list.forEach(childContainer -> {
                        ConsumerMetrics consumerMetrics = new ConsumerMetrics();

                        logger.logDebug(String.format("Child Listener Id is %s and assigned partitions are %s ",
                                childContainer.getListenerId(),
                                childContainer.getAssignedPartitions()),
                                this.getClass());

                        populateAndPrintContainerMetrics(childContainer.metrics(), consumerMetrics);

                        if (!consumerMetrics.getRecordsConsumedRatePerSec()
                                .equalsIgnoreCase("0.0") && !consumerMetrics.getTotalRecordsConsumed()
                                .equalsIgnoreCase("0.0")) {
                            consumerMetricsList.add(consumerMetrics);
                        }
                    });

                    mtpConsumerMetricsList.addAll(consumerMetricsList);
                } else {
                    logger.logErrMsg("ERROR! ERROR! ERROR! concurrentMsgLstnrCntnr for Id myConsumer is null",
                            this.getClass());
                }
            }
        } catch (Exception ex) {
            ex.printStackTrace();
            logger.logException(ex,
                    this.getClass());

        }
    }

    private void populateAndPrintContainerMetrics(Map<String, Map<MetricName, ? extends Metric>> metrics, ConsumerMetrics consumerMetrics) {
        metrics.entrySet()
                .forEach(entry -> {
                    String topLevelMaetricKey = entry.getKey();
                    consumerMetrics.setConsumerId(topLevelMaetricKey);
                    System.out.println("metrics map entry key is " + topLevelMaetricKey);
                    entry.getValue()
                            .entrySet()
                            .forEach(innerMapEntry -> {

                                String metricKey = innerMapEntry.getKey()
                                        .name();
                                String metricValue = String.valueOf(((Metric) innerMapEntry.getValue()).metricValue());
                                System.out.println(" metricKey is " + metricKey + " and metricValue is " + metricValue);

                                switch (metricKey) {
                                    case "records-consumed-rate":
                                        //The average number of records consumed per second
                                        consumerMetrics.setRecordsConsumedRatePerSec(metricValue);
                                        break;
                                    case "records-consumed-total":
                                        //The total number of records consumed
                                        consumerMetrics.setTotalRecordsConsumed(metricValue);
                                        break;
                                    case "request-size-avg":
                                        //The average size of requests sent
                                        consumerMetrics.setRequestSizeAvg(metricValue);
                                        break;
                                    case "request-rate":
                                        //The number of requests sent per second
                                        consumerMetrics.setRequestRate(metricValue);
                                        break;
                                    case "request-total":
                                        //The total number of requests sent
                                        consumerMetrics.setRequestTotal(metricValue);
                                        break;
                                    case "fetch-rate":
                                        //The number of fetch requests per second
                                        consumerMetrics.setFetchRate(metricValue);
                                        break;

                                    case "fetch-total":
                                        //The total number of fetch requests
                                        consumerMetrics.setFetchTotal(metricValue);
                                        break;
                                    case "fetch-latency-max":
                                        //The max time taken for any fetch request
                                        consumerMetrics.setFetchLatencyMax(metricValue);
                                        break;
                                    case "records-per-request-avg":
                                        //The average number of records in each request for a topic
                                        consumerMetrics.setRecordsPerRequestAvg(metricValue);
                                        break;
                                    case "assigned-partitions":
                                        //The number of partitions currently assigned to this consumer
                                        consumerMetrics.setAssignedPartitions(metricValue);
                                        break;
                                    case "records-lag-max":
                                        //The max lag of the partition
                                        consumerMetrics.setRecordsLagMax(metricValue);
                                        break;
                                }
                            });

                });
    }

Вот результат потребительских показателей, и вы можете ясно видно, что для одного и того же потока приходит более одного результата:

consumerId is producer-perf-test-topic32-ClientId-0 and recordsConsumedRatePerSec is 68.71762342742362 and totalRecordsConsumed is 8715.0 

consumerId is producer-perf-test-topic32-ClientId-1 and recordsConsumedRatePerSec is 154.52282723584165 and totalRecordsConsumed is 9064.0 

consumerId is producer-perf-test-topic32-ClientId-2 and recordsConsumedRatePerSec is 150.11590645667144 and totalRecordsConsumed is 8807.0 

consumerId is producer-perf-test-topic32-ClientId-3 and recordsConsumedRatePerSec is 164.1384476565027 and totalRecordsConsumed is 9641.0 

consumerId is producer-perf-test-topic32-ClientId-4 and recordsConsumedRatePerSec is 166.65247526438583 and totalRecordsConsumed is 9786.0 

consumerId is producer-perf-test-topic32-ClientId-0 and recordsConsumedRatePerSec is 68.71762342742362 and totalRecordsConsumed is 8715.0 

consumerId is producer-perf-test-topic32-ClientId-1 and recordsConsumedRatePerSec is 154.5070230465021 and totalRecordsConsumed is 9064.0 

consumerId is producer-perf-test-topic32-ClientId-2 and recordsConsumedRatePerSec is 150.10055561236663 and totalRecordsConsumed is 8807.0 

consumerId is producer-perf-test-topic32-ClientId-3 and recordsConsumedRatePerSec is 164.16919252120016 and totalRecordsConsumed is 9641.0

consumerId is producer-perf-test-topic32-ClientId-4 and recordsConsumedRatePerSec is 166.63828627865 and totalRecordsConsumed is 9786.0 

consumerId is producer-perf-test-topic32-ClientId-0 and recordsConsumedRatePerSec is 68.52444038373686 and totalRecordsConsumed is 8715.0 

consumerId is producer-perf-test-topic32-ClientId-1 and recordsConsumedRatePerSec is 170.05067510118016 and totalRecordsConsumed is 10000.0 

consumerId is producer-perf-test-topic32-ClientId-2 and recordsConsumedRatePerSec is 170.0420003740924 and totalRecordsConsumed is 10000.0 

consumerId is producer-perf-test-topic32-ClientId-3 and recordsConsumedRatePerSec is 163.85390642261086 and totalRecordsConsumed is 9641.0 

consumerId is producer-perf-test-topic32-ClientId-4 and recordsConsumedRatePerSec is 166.39178412935914 and totalRecordsConsumed is 9786.0 

1 Ответ

0 голосов
/ 18 июня 2020

Непонятно, о чем вы спрашиваете - вы запускаете этот код во всех 5 потребительских потоках, поэтому вы получите 5x5 = 25 строк (каждая отображается 5 раз).

Здесь я всего 5 строк, независимо от того, какой метод я использую для получения показателей:

@SpringBootApplication
public class So62452699Application {

    public static void main(String[] args) {
        SpringApplication.run(So62452699Application.class, args);
    }

    @KafkaListener(id = "so62452699", topics = "so62452699", concurrency = "5")
    public void listen(String in) {
        System.out.println(in);
    }


    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so62452699").partitions(1).replicas(1).build();
    }

    @Bean
    public ApplicationRunner runner(KafkaListenerEndpointRegistry registry) {
        return args -> {

            // this one's more concise 

            registry.getListenerContainer("so62452699").metrics().forEach((clientId, metrics) -> {
                System.out.println(clientId + ": " + metrics);
            });

            // or

            ((ConcurrentMessageListenerContainer<?, ?>) registry.getListenerContainer("so62452699"))
                    .getContainers()
                    .forEach(container -> System.out.println(container.metrics()));
        };
    }

}
...