как обрабатывать TaskRejectedException в SpringBoot - PullRequest
0 голосов
/ 20 сентября 2018

Я обрабатываю сообщения kafka с использованием релиза spring boot 2.0.2 и spring kafka 2.1.10 и вставляю их вasticsearch 6.x.Я получаю пакет из 100 сообщений, которые я хочу вставить в ElasticSearch параллельно.Более того, я использую подтверждение вручную в моем @kafkalistener.Я понимаю, что, когда очередь моего threadPoolTaskExecutor заполнится, я получу TaskRejectedException, я просто ловлю его, сообщаю об этом и возвращаю обратно.

Но предположение ack.acknowledge не будет вызвано, и, следовательно, сообщение будет доставлено kafka.Но, по-видимому, при загрузке сообщений из 30 тыс. Сообщений мне не хватает нескольких (~ 10) сообщений.Интересно, правильно ли я обрабатываю исключения, которые могут вызвать пропущенные сообщения.Любая помощь будет оценена.

Вот мой поток PoolTaskExecutor

@Configuration
@EnableAsync
public class CommonBeanConfig {
    @Bean(name = "threadPoolTaskExecutor")
    public Executor threadPoolTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(20);
        executor.setMaxPoolSize(100);
        executor.setQueueCapacity(500);
        executor.setThreadNamePrefix("KafaSubscriber-Async#");
        executor.initialize();
        return executor;
    }
}

Вот метод @Async

@Async
    public CompletableFuture<Integer> publishToElasticSearch(String key, EcrAvro avroData) throws Exception {
        logger.warn("ECR avro key=" + key + " - ECR Avro value= " + avroData.toString());
       // check if records exists then update else insert (upsert)
        return CompletableFuture.completedFuture(res.getStatusLine().getStatusCode());
    }

Вот мой @ kafkalistener

@KafkaListener(topics = "${topic}", containerFactory = "ecrAvroListenerFactory")
public void listen(final Acknowledgment ack, final List<ConsumerRecord<String, EcrAvro>> messages) throws Exception {
    try {
        List<CompletableFuture<Integer>> completableFuturesList = new ArrayList<>();
        for (ConsumerRecord<String, EcrAvro> kafkaRecord : messages) {
            String key = kafkaRecord.key();
            EcrAvro avroData = kafkaRecord.value();
            completableFuturesList.add(publishToElasticService.publishToElasticSearch(key, avroData));
        }
        CompletableFuture.allOf(completableFuturesList.toArray(new CompletableFuture[completableFuturesList.size()])).join();
        logger.warn("******all threads joined ..!************\n\n");
        ack.acknowledge();
        logger.warn("******acknowledge done..!************\n\n");
    }catch(TaskRejectedException trje){
        logger.warn("******task rejected!************\n\n");
        throw trje;
    }
}

Мой Consumerconfig показан здесьа также

//Builds the consumer factory, required for @KafkaListener
protected ConcurrentKafkaListenerContainerFactory<Object, Object> buildConcurrentKafkaListenerFactory(String consumerType) {
    Map<String, Object> properties = initializeCommonConsumerConfig();

    properties.put(ConsumerConfig.GROUP_ID_CONFIG, environment.getProperty("group.id"));
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer.class);
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroDeserializer.class);
    properties.put("schema.registry.url", environment.getProperty("kafka.schema.registry.url"));
    properties.put("specific.avro.reader", "true");

    logger.info("Consumer Factory Properties: " + getPropertyAsString(properties));

    final ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConcurrency(Integer.parseInt(environment.getProperty("accountupdate.concurrent.consumer.count")));
    factory.setBatchListener(true);
    factory.setConsumerFactory(new DefaultKafkaConsumerFactory<Object, Object>(properties));
    factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);
    // by default spring kafka is configured to send ack on error, disabling it
    factory.getContainerProperties().setAckOnError(false);

    return factory;
}

public Map<String, Object> initializeCommonConsumerConfig() {
    HashMap props = new HashMap();
    props.put("bootstrap.servers", environment.getProperty("kafka.bootstrap.servers"));
    props.put("enable.auto.commit", environment.getProperty("enable.auto.commit"));
    props.put("session.timeout.ms", environment.getProperty("session.timeout.ms"));
    props.put("auto.offset.reset", environment.getProperty("auto.offset.reset"));
    props.put("fetch.max.wait.ms", environment.getProperty("fetch.max.wait.ms"));
    props.put("max.partition.fetch.bytes", environment.getProperty("max.partition.fetch.bytes"));
    props.put("max.poll.records", environment.getProperty("max.poll.records"));

    String jaasFile = environment.getProperty("jaasfile");
    System.out.println("Jaas file is " + jaasFile);

    if (jaasFile != null) {
        props.put("security.protocol", environment.getProperty("security.protocol"));
        props.put("sasl.kerberos.service.name", environment.getProperty("sasl.kerberos.service.name"));

        try {
        System.setProperty("java.security.auth.login.config", this.resourceLoader.getResource(jaasFile).getURI().toString());
        System.out.println("java.security.auth.login.config::" + System.getProperty("java.security.auth.login.config"));
        System.setProperty("java.security.krb5.realm", environment.getProperty("realm"));
        System.setProperty("java.security.krb5.kdc", environment.getProperty("kdc"));
        System.setProperty("sun.security.krb5.debug", environment.getProperty("krb.debug"));
        System.setProperty("sun.security.krb5.principal", environment.getProperty("principal"));
        } catch (IOException ioException) {
            ioException.printStackTrace();
        }
    }

    return props;
}

1 Ответ

0 голосов
/ 20 сентября 2018

Вы можете использовать политику отклонения "вызывающих абонентов".

Однако Kafka не отслеживает подтверждения сообщений, просто смещение в теме / разделе.

Если пропустить смещение 9и подтвердите, что смещение 10, 9 никогда не будет доставлено.

Таким образом, вы не можете использовать kafka таким образом.

...