Как приостановить / остановить потребительский поток после использования предварительно определенного количества записей, когда для параметра concurrency установлено значение больше 1? - PullRequest
0 голосов
/ 16 июня 2020
• 1000 Идея, лежащая в основе этого требования, состоит в том, чтобы увидеть, можем ли мы равномерно распределить потребление сообщений между всеми потоками / экземплярами потребителей при использовании параллелизма. и у потребителя, как показано ниже, с параллелизмом, установленным на '5', могу ли я остановить / приостановить потоки потребителей после того, как они потребили 10 тыс. записей каждый?
@KafkaListener(id = "myConsumerId", topics = "myTopic", concurrency=5)
    public void listen(String in) {
        System.out.println(in);
    }

Вот моя ThreadLocal var

public static ThreadLocal<Integer> consumedMessages = new ThreadLocal<Integer>() { @Override protected Integer initialValue() { return Integer.valueOf(0); } };

Вот мой лог c в потребителе, чтобы увеличить счетчик и приостановить потребителя.

int messageCount = 10000;

@KafkaListener(id = "myConsumerId", topics = "myTopic", concurrency=5)
        public void listen(String in) {
            System.out.println(in);

            ConsumerPerfTestingConstants.consumedMessages.set(ConsumerPerfTestingConstants.consumedMessages.get() .intValue() + 1); 
            if (ConsumerPerfTestingConstants.consumedMessages.get() == messageCount) { 
                System.out.println("ATTENTION! ATTENTION! ATTENTION! Consumer Finished processing "+messageCount+" messages"); 

                ConcurrentMessageListenerContainer concurrentMsgLstnrCntnr = (ConcurrentMessageListenerContainer) this.registry.getListenerContainer("myConsumerId"); 

                if (ConsumerPerfTestingConstants.mtpConsumerConsumedMessages.get() == messageCount && concurrentMsgLstnrCntnr != null) 
                    concurrentMsgLstnrCntnr.pause();
            }
        }



    if (concurrentMsgLstnrCntnr != null) {
    List<KafkaMessageListenerContainer> list = concurrentMsgLstnrCntnr.getContainers();
    List<ConsumerMetrics> consumerMetricsList = new ArrayList<>();

    list.forEach(childContainer -> {
        // ConsumerMetrics is my custom bean
        ConsumerMetrics consumerMetrics = new ConsumerMetrics();

        logger.logDebug(String.format("Child Listener Id is %s and assigned partitions are %s ",
                childContainer.getListenerId(),
                childContainer.getAssignedPartitions()), this.getClass());

        // This method is to populated the metrics
        populateAndPrintContainerMetrics(childContainer.metrics(), consumerMetrics);

        if (!consumerMetrics.getRecordsConsumedRatePerSec()
                .equalsIgnoreCase("0.0") && !consumerMetrics.getTotalRecordsConsumed().equalsIgnoreCase("0.0")) {
            consumerMetricsList.add(consumerMetrics);
        }
    });
}


private void populateAndPrintContainerMetrics(Map<String, Map<MetricName, ? extends Metric>> metrics, ConsumerMetrics consumerMetrics) {
metrics.entrySet()
        .forEach(entry -> {
            String topLevelMaetricKey = entry.getKey();
            consumerMetrics.setConsumerId(topLevelMaetricKey);
            System.out.println("metrics map entry key is " + topLevelMaetricKey);
            entry.getValue()
                    .entrySet()
                    .forEach(innerMapEntry -> {

                        String metricKey = innerMapEntry.getKey()
                                .name();
                        String metricValue = String.valueOf(((Metric) innerMapEntry.getValue()).metricValue());
                        System.out.println(" metricKey is " + metricKey + " and metricValue is " + metricValue);

                        switch (metricKey) {
                            case "records-consumed-rate":
                                //The average number of records consumed per second
                                consumerMetrics.setRecordsConsumedRatePerSec(metricValue);
                                break;
                            case "records-consumed-total":
                                //The total number of records consumed
                                consumerMetrics.setTotalRecordsConsumed(metricValue);
                                break;
                            case "request-size-avg":
                                //The average size of requests sent
                                consumerMetrics.setRequestSizeAvg(metricValue);
                                break;
                            case "request-rate":
                                //The number of requests sent per second
                                consumerMetrics.setRequestRate(metricValue);
                                break;
                            case "request-total":
                                //The total number of requests sent
                                consumerMetrics.setRequestTotal(metricValue);
                                break;
                            case "fetch-rate":
                                //The number of fetch requests per second
                                consumerMetrics.setFetchRate(metricValue);
                                break;

                            case "fetch-total":
                                //The total number of fetch requests
                                consumerMetrics.setFetchTotal(metricValue);
                                break;
                            case "fetch-latency-max":
                                //The max time taken for any fetch request
                                consumerMetrics.setFetchLatencyMax(metricValue);
                                break;
                            case "records-per-request-avg":
                                //The average number of records in each request for a topic
                                consumerMetrics.setRecordsPerRequestAvg(metricValue);
                                break;
                            case "assigned-partitions":
                                //The number of partitions currently assigned to this consumer
                                consumerMetrics.setAssignedPartitions(metricValue);
                                break;
                            case "records-lag-max":
                                //The max lag of the partitio
                                consumerMetrics.setRecordsLagMax(metricValue);
                                break;
                        }
                    });

        });
}

Пожалуйста, предложите

1 Ответ

0 голосов
/ 16 июня 2020

Идея, лежащая в основе этого требования, состоит в том, чтобы увидеть, можем ли мы равномерно распределить потребление сообщений между всеми потоками / экземплярами потребителей при использовании параллелизма.

Это необычный вариант использования; распределение записей обычно обрабатывается на стороне производителя разделителем.

Тем не менее, вы можете использовать ThreadLocal<AtomicInteger> для отслеживания количества записей.

Вы можете приостановить / остановить отдельные контейнеры, как я описал вчера .

...