• 1000 Идея, лежащая в основе этого требования, состоит в том, чтобы увидеть, можем ли мы равномерно распределить потребление сообщений между всеми потоками / экземплярами потребителей при использовании параллелизма. и у потребителя, как показано ниже, с параллелизмом, установленным на '5', могу ли я остановить / приостановить потоки потребителей после того, как они потребили 10 тыс. записей каждый?
@KafkaListener(id = "myConsumerId", topics = "myTopic", concurrency=5)
public void listen(String in) {
System.out.println(in);
}
Вот моя ThreadLocal var
public static ThreadLocal<Integer> consumedMessages = new ThreadLocal<Integer>() { @Override protected Integer initialValue() { return Integer.valueOf(0); } };
Вот мой лог c в потребителе, чтобы увеличить счетчик и приостановить потребителя.
int messageCount = 10000;
@KafkaListener(id = "myConsumerId", topics = "myTopic", concurrency=5)
public void listen(String in) {
System.out.println(in);
ConsumerPerfTestingConstants.consumedMessages.set(ConsumerPerfTestingConstants.consumedMessages.get() .intValue() + 1);
if (ConsumerPerfTestingConstants.consumedMessages.get() == messageCount) {
System.out.println("ATTENTION! ATTENTION! ATTENTION! Consumer Finished processing "+messageCount+" messages");
ConcurrentMessageListenerContainer concurrentMsgLstnrCntnr = (ConcurrentMessageListenerContainer) this.registry.getListenerContainer("myConsumerId");
if (ConsumerPerfTestingConstants.mtpConsumerConsumedMessages.get() == messageCount && concurrentMsgLstnrCntnr != null)
concurrentMsgLstnrCntnr.pause();
}
}
if (concurrentMsgLstnrCntnr != null) {
List<KafkaMessageListenerContainer> list = concurrentMsgLstnrCntnr.getContainers();
List<ConsumerMetrics> consumerMetricsList = new ArrayList<>();
list.forEach(childContainer -> {
// ConsumerMetrics is my custom bean
ConsumerMetrics consumerMetrics = new ConsumerMetrics();
logger.logDebug(String.format("Child Listener Id is %s and assigned partitions are %s ",
childContainer.getListenerId(),
childContainer.getAssignedPartitions()), this.getClass());
// This method is to populated the metrics
populateAndPrintContainerMetrics(childContainer.metrics(), consumerMetrics);
if (!consumerMetrics.getRecordsConsumedRatePerSec()
.equalsIgnoreCase("0.0") && !consumerMetrics.getTotalRecordsConsumed().equalsIgnoreCase("0.0")) {
consumerMetricsList.add(consumerMetrics);
}
});
}
private void populateAndPrintContainerMetrics(Map<String, Map<MetricName, ? extends Metric>> metrics, ConsumerMetrics consumerMetrics) {
metrics.entrySet()
.forEach(entry -> {
String topLevelMaetricKey = entry.getKey();
consumerMetrics.setConsumerId(topLevelMaetricKey);
System.out.println("metrics map entry key is " + topLevelMaetricKey);
entry.getValue()
.entrySet()
.forEach(innerMapEntry -> {
String metricKey = innerMapEntry.getKey()
.name();
String metricValue = String.valueOf(((Metric) innerMapEntry.getValue()).metricValue());
System.out.println(" metricKey is " + metricKey + " and metricValue is " + metricValue);
switch (metricKey) {
case "records-consumed-rate":
//The average number of records consumed per second
consumerMetrics.setRecordsConsumedRatePerSec(metricValue);
break;
case "records-consumed-total":
//The total number of records consumed
consumerMetrics.setTotalRecordsConsumed(metricValue);
break;
case "request-size-avg":
//The average size of requests sent
consumerMetrics.setRequestSizeAvg(metricValue);
break;
case "request-rate":
//The number of requests sent per second
consumerMetrics.setRequestRate(metricValue);
break;
case "request-total":
//The total number of requests sent
consumerMetrics.setRequestTotal(metricValue);
break;
case "fetch-rate":
//The number of fetch requests per second
consumerMetrics.setFetchRate(metricValue);
break;
case "fetch-total":
//The total number of fetch requests
consumerMetrics.setFetchTotal(metricValue);
break;
case "fetch-latency-max":
//The max time taken for any fetch request
consumerMetrics.setFetchLatencyMax(metricValue);
break;
case "records-per-request-avg":
//The average number of records in each request for a topic
consumerMetrics.setRecordsPerRequestAvg(metricValue);
break;
case "assigned-partitions":
//The number of partitions currently assigned to this consumer
consumerMetrics.setAssignedPartitions(metricValue);
break;
case "records-lag-max":
//The max lag of the partitio
consumerMetrics.setRecordsLagMax(metricValue);
break;
}
});
});
}
Пожалуйста, предложите