Я обрабатываю сообщения kafka с использованием релиза spring boot 2.0.2 и spring kafka 2.1.10 и вставляю их вasticsearch 6.x.Я получаю пакет из 100 сообщений, которые я хочу вставить в ElasticSearch параллельно.Более того, я использую подтверждение вручную в моем @kafkalistener.Я понимаю, что, когда очередь моего threadPoolTaskExecutor заполнится, я получу TaskRejectedException, я просто ловлю его, сообщаю об этом и возвращаю обратно.
Но предположение ack.acknowledge не будет вызвано, и, следовательно, сообщение будет доставлено kafka.Но, по-видимому, при загрузке сообщений из 30 тыс. Сообщений мне не хватает нескольких (~ 10) сообщений.Интересно, правильно ли я обрабатываю исключения, которые могут вызвать пропущенные сообщения.Любая помощь будет оценена.
Вот мой поток PoolTaskExecutor
@Configuration
@EnableAsync
public class CommonBeanConfig {
@Bean(name = "threadPoolTaskExecutor")
public Executor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(20);
executor.setMaxPoolSize(100);
executor.setQueueCapacity(500);
executor.setThreadNamePrefix("KafaSubscriber-Async#");
executor.initialize();
return executor;
}
}
Вот метод @Async
@Async
public CompletableFuture<Integer> publishToElasticSearch(String key, EcrAvro avroData) throws Exception {
logger.warn("ECR avro key=" + key + " - ECR Avro value= " + avroData.toString());
// check if records exists then update else insert (upsert)
return CompletableFuture.completedFuture(res.getStatusLine().getStatusCode());
}
Вот мой @ kafkalistener
@KafkaListener(topics = "${topic}", containerFactory = "ecrAvroListenerFactory")
public void listen(final Acknowledgment ack, final List<ConsumerRecord<String, EcrAvro>> messages) throws Exception {
try {
List<CompletableFuture<Integer>> completableFuturesList = new ArrayList<>();
for (ConsumerRecord<String, EcrAvro> kafkaRecord : messages) {
String key = kafkaRecord.key();
EcrAvro avroData = kafkaRecord.value();
completableFuturesList.add(publishToElasticService.publishToElasticSearch(key, avroData));
}
CompletableFuture.allOf(completableFuturesList.toArray(new CompletableFuture[completableFuturesList.size()])).join();
logger.warn("******all threads joined ..!************\n\n");
ack.acknowledge();
logger.warn("******acknowledge done..!************\n\n");
}catch(TaskRejectedException trje){
logger.warn("******task rejected!************\n\n");
throw trje;
}
}
Мой Consumerconfig показан здесьа также
//Builds the consumer factory, required for @KafkaListener
protected ConcurrentKafkaListenerContainerFactory<Object, Object> buildConcurrentKafkaListenerFactory(String consumerType) {
Map<String, Object> properties = initializeCommonConsumerConfig();
properties.put(ConsumerConfig.GROUP_ID_CONFIG, environment.getProperty("group.id"));
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroDeserializer.class);
properties.put("schema.registry.url", environment.getProperty("kafka.schema.registry.url"));
properties.put("specific.avro.reader", "true");
logger.info("Consumer Factory Properties: " + getPropertyAsString(properties));
final ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConcurrency(Integer.parseInt(environment.getProperty("accountupdate.concurrent.consumer.count")));
factory.setBatchListener(true);
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<Object, Object>(properties));
factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);
// by default spring kafka is configured to send ack on error, disabling it
factory.getContainerProperties().setAckOnError(false);
return factory;
}
public Map<String, Object> initializeCommonConsumerConfig() {
HashMap props = new HashMap();
props.put("bootstrap.servers", environment.getProperty("kafka.bootstrap.servers"));
props.put("enable.auto.commit", environment.getProperty("enable.auto.commit"));
props.put("session.timeout.ms", environment.getProperty("session.timeout.ms"));
props.put("auto.offset.reset", environment.getProperty("auto.offset.reset"));
props.put("fetch.max.wait.ms", environment.getProperty("fetch.max.wait.ms"));
props.put("max.partition.fetch.bytes", environment.getProperty("max.partition.fetch.bytes"));
props.put("max.poll.records", environment.getProperty("max.poll.records"));
String jaasFile = environment.getProperty("jaasfile");
System.out.println("Jaas file is " + jaasFile);
if (jaasFile != null) {
props.put("security.protocol", environment.getProperty("security.protocol"));
props.put("sasl.kerberos.service.name", environment.getProperty("sasl.kerberos.service.name"));
try {
System.setProperty("java.security.auth.login.config", this.resourceLoader.getResource(jaasFile).getURI().toString());
System.out.println("java.security.auth.login.config::" + System.getProperty("java.security.auth.login.config"));
System.setProperty("java.security.krb5.realm", environment.getProperty("realm"));
System.setProperty("java.security.krb5.kdc", environment.getProperty("kdc"));
System.setProperty("sun.security.krb5.debug", environment.getProperty("krb.debug"));
System.setProperty("sun.security.krb5.principal", environment.getProperty("principal"));
} catch (IOException ioException) {
ioException.printStackTrace();
}
}
return props;
}