• 1000 А затем я пытаюсь захватить показатели потребителя, чтобы понять, насколько быстро мой потребитель может потреблять и обрабатывать записи.
Например, я создал 50 тыс. Сообщений на topi c и установил для параллелизма значение 5 , добавляя попытку приостановить потоки потребителей после того, как они потребили 10 тыс. записей. Теперь, когда я пытаюсь зафиксировать потребительские метрики, я получаю повторяющиеся значения и не уверен, какое из них правильное. Итак, может ли кто-нибудь помочь мне понять, как я могу получить правильные показатели моего потребителя?
Вот мой код потребителя
public static ThreadLocal<Integer> consumedMessages = new ThreadLocal<Integer>() {
@Override
protected Integer initialValue() {
return Integer.valueOf(0);
}
};
public static List<ConsumerMetrics> mtpConsumerMetricsList = new ArrayList<>();
int messageCount =10000;
@KafkaListener(id = "myConsumer", topics = "myTopic", concurrency = 5)
public void listen(String in) {
try {
consumedMessages.set(consumedMessages.get().intValue() + 1);
System.out.println(" ConsumerConsumedMessages " + consumedMessages.get());
if (consumedMessages.get() == messageCount) {
System.out.println("ATTENTION! ATTENTION! ATTENTION! Consumer Finished processing " + messageCount + " messages");
ConcurrentMessageListenerContainer concurrentMsgLstnrCntnr = (ConcurrentMessageListenerContainer) this.registry.getListenerContainer("myConsumer");
if (concurrentMsgLstnrCntnr != null) {
concurrentMsgLstnrCntnr.pause();
System.out.println("Pausing of mtpConsumer is done at " + LocalDateTime.now());
List<KafkaMessageListenerContainer> list = concurrentMsgLstnrCntnr.getContainers();
System.out.println("Containers list size "+list.size());
List<ConsumerMetrics> consumerMetricsList = new ArrayList<>();
list.forEach(childContainer -> {
ConsumerMetrics consumerMetrics = new ConsumerMetrics();
logger.logDebug(String.format("Child Listener Id is %s and assigned partitions are %s ",
childContainer.getListenerId(),
childContainer.getAssignedPartitions()),
this.getClass());
populateAndPrintContainerMetrics(childContainer.metrics(), consumerMetrics);
if (!consumerMetrics.getRecordsConsumedRatePerSec()
.equalsIgnoreCase("0.0") && !consumerMetrics.getTotalRecordsConsumed()
.equalsIgnoreCase("0.0")) {
consumerMetricsList.add(consumerMetrics);
}
});
mtpConsumerMetricsList.addAll(consumerMetricsList);
} else {
logger.logErrMsg("ERROR! ERROR! ERROR! concurrentMsgLstnrCntnr for Id myConsumer is null",
this.getClass());
}
}
} catch (Exception ex) {
ex.printStackTrace();
logger.logException(ex,
this.getClass());
}
}
private void populateAndPrintContainerMetrics(Map<String, Map<MetricName, ? extends Metric>> metrics, ConsumerMetrics consumerMetrics) {
metrics.entrySet()
.forEach(entry -> {
String topLevelMaetricKey = entry.getKey();
consumerMetrics.setConsumerId(topLevelMaetricKey);
System.out.println("metrics map entry key is " + topLevelMaetricKey);
entry.getValue()
.entrySet()
.forEach(innerMapEntry -> {
String metricKey = innerMapEntry.getKey()
.name();
String metricValue = String.valueOf(((Metric) innerMapEntry.getValue()).metricValue());
System.out.println(" metricKey is " + metricKey + " and metricValue is " + metricValue);
switch (metricKey) {
case "records-consumed-rate":
//The average number of records consumed per second
consumerMetrics.setRecordsConsumedRatePerSec(metricValue);
break;
case "records-consumed-total":
//The total number of records consumed
consumerMetrics.setTotalRecordsConsumed(metricValue);
break;
case "request-size-avg":
//The average size of requests sent
consumerMetrics.setRequestSizeAvg(metricValue);
break;
case "request-rate":
//The number of requests sent per second
consumerMetrics.setRequestRate(metricValue);
break;
case "request-total":
//The total number of requests sent
consumerMetrics.setRequestTotal(metricValue);
break;
case "fetch-rate":
//The number of fetch requests per second
consumerMetrics.setFetchRate(metricValue);
break;
case "fetch-total":
//The total number of fetch requests
consumerMetrics.setFetchTotal(metricValue);
break;
case "fetch-latency-max":
//The max time taken for any fetch request
consumerMetrics.setFetchLatencyMax(metricValue);
break;
case "records-per-request-avg":
//The average number of records in each request for a topic
consumerMetrics.setRecordsPerRequestAvg(metricValue);
break;
case "assigned-partitions":
//The number of partitions currently assigned to this consumer
consumerMetrics.setAssignedPartitions(metricValue);
break;
case "records-lag-max":
//The max lag of the partition
consumerMetrics.setRecordsLagMax(metricValue);
break;
}
});
});
}
Вот результат потребительских показателей, и вы можете ясно видно, что для одного и того же потока приходит более одного результата:
consumerId is producer-perf-test-topic32-ClientId-0 and recordsConsumedRatePerSec is 68.71762342742362 and totalRecordsConsumed is 8715.0
consumerId is producer-perf-test-topic32-ClientId-1 and recordsConsumedRatePerSec is 154.52282723584165 and totalRecordsConsumed is 9064.0
consumerId is producer-perf-test-topic32-ClientId-2 and recordsConsumedRatePerSec is 150.11590645667144 and totalRecordsConsumed is 8807.0
consumerId is producer-perf-test-topic32-ClientId-3 and recordsConsumedRatePerSec is 164.1384476565027 and totalRecordsConsumed is 9641.0
consumerId is producer-perf-test-topic32-ClientId-4 and recordsConsumedRatePerSec is 166.65247526438583 and totalRecordsConsumed is 9786.0
consumerId is producer-perf-test-topic32-ClientId-0 and recordsConsumedRatePerSec is 68.71762342742362 and totalRecordsConsumed is 8715.0
consumerId is producer-perf-test-topic32-ClientId-1 and recordsConsumedRatePerSec is 154.5070230465021 and totalRecordsConsumed is 9064.0
consumerId is producer-perf-test-topic32-ClientId-2 and recordsConsumedRatePerSec is 150.10055561236663 and totalRecordsConsumed is 8807.0
consumerId is producer-perf-test-topic32-ClientId-3 and recordsConsumedRatePerSec is 164.16919252120016 and totalRecordsConsumed is 9641.0
consumerId is producer-perf-test-topic32-ClientId-4 and recordsConsumedRatePerSec is 166.63828627865 and totalRecordsConsumed is 9786.0
consumerId is producer-perf-test-topic32-ClientId-0 and recordsConsumedRatePerSec is 68.52444038373686 and totalRecordsConsumed is 8715.0
consumerId is producer-perf-test-topic32-ClientId-1 and recordsConsumedRatePerSec is 170.05067510118016 and totalRecordsConsumed is 10000.0
consumerId is producer-perf-test-topic32-ClientId-2 and recordsConsumedRatePerSec is 170.0420003740924 and totalRecordsConsumed is 10000.0
consumerId is producer-perf-test-topic32-ClientId-3 and recordsConsumedRatePerSec is 163.85390642261086 and totalRecordsConsumed is 9641.0
consumerId is producer-perf-test-topic32-ClientId-4 and recordsConsumedRatePerSec is 166.39178412935914 and totalRecordsConsumed is 9786.0