AggregatingReplyingKafkaTemplate releaseStrategy Вопрос - PullRequest
0 голосов
/ 31 марта 2020

Кажется, есть проблема, когда я использую AggregatingReplyingKafkaTemplate с template.setReturnPartialOnTimeout (true) в том, что он возвращает исключение тайм-аута, даже если частичные результаты доступны от потребителей.

В примере ниже, у меня есть 3 потребителя чтобы ответить на запрос topi c, и я установил время ожидания ответа на 10 секунд. Я явно отложил ответ Потребителя 3 на 11 секунд, однако я ожидаю ответ от Потребителя 1 и 2, поэтому я могу вернуть частичные результаты. Тем не менее, я получаю KafkaReplyTimeoutException. Ценю ваш вклад. Спасибо.

Я следую коду, основанному на модульном тесте ниже. [ReplyingKafkaTemplateTests] [1]

Я предоставил приведенный ниже код:


@RestController
public class SumController {

    @Value("${kafka.bootstrap-servers}")
    private String bootstrapServers;

    public static final String D_REPLY = "dReply";

    public static final String D_REQUEST = "dRequest";

    @ResponseBody
    @PostMapping(value="/sum")
    public String sum(@RequestParam("message") String message) throws InterruptedException, ExecutionException {

        AggregatingReplyingKafkaTemplate<Integer, String, String> template = aggregatingTemplate(
                new TopicPartitionOffset(D_REPLY, 0), 3, new AtomicInteger());
        String resultValue ="";
        String currentValue ="";

        try {
            template.setDefaultReplyTimeout(Duration.ofSeconds(10));
            template.setReturnPartialOnTimeout(true);

            ProducerRecord<Integer, String> record = new ProducerRecord<>(D_REQUEST, null, null, null, message);

            RequestReplyFuture<Integer, String, Collection<ConsumerRecord<Integer, String>>> future =
                    template.sendAndReceive(record);

            future.getSendFuture().get(5, TimeUnit.SECONDS); // send ok
            System.out.println("Send Completed Successfully");

            ConsumerRecord<Integer, Collection<ConsumerRecord<Integer, String>>> consumerRecord = future.get(10, TimeUnit.SECONDS);
            System.out.println("Consumer record size "+consumerRecord.value().size());

            Iterator<ConsumerRecord<Integer, String>> iterator = consumerRecord.value().iterator();

            while (iterator.hasNext()) {
                currentValue = iterator.next().value();
                System.out.println("response " + currentValue);
                System.out.println("Record header " + consumerRecord.headers().toString());
                resultValue = resultValue + currentValue + "\r\n";
            }


        } catch (Exception e) {
            System.out.println("Error Message is "+e.getMessage());
        } 

        return resultValue;

    }

    public AggregatingReplyingKafkaTemplate<Integer, String, String> aggregatingTemplate(
            TopicPartitionOffset topic, int releaseSize, AtomicInteger releaseCount) {
        //Create Container Properties
        ContainerProperties containerProperties = new ContainerProperties(topic);
        containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        //Set the consumer Config
        //Create Consumer Factory with Consumer Config
        DefaultKafkaConsumerFactory<Integer, Collection<ConsumerRecord<Integer, String>>> cf =
                new DefaultKafkaConsumerFactory<>(consumerConfigs());

        //Create Listener Container with Consumer Factory and Container Property
        KafkaMessageListenerContainer<Integer, Collection<ConsumerRecord<Integer, String>>> container =
                new KafkaMessageListenerContainer<>(cf, containerProperties);
        //  container.setBeanName(this.testName);
        AggregatingReplyingKafkaTemplate<Integer, String, String> template =
                new AggregatingReplyingKafkaTemplate<>(new DefaultKafkaProducerFactory<>(producerConfigs()), container,
                        (list, timeout) -> {
                            releaseCount.incrementAndGet();
                            return list.size() == releaseSize;
                        });
        template.setSharedReplyTopic(true);
        template.start();
        return template;
    }


    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test_id");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer.class);
        return props;
    }

    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        // list of host:port pairs used for establishing the initial connections to the Kakfa cluster
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                org.apache.kafka.common.serialization.StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer.class);
        return props;
    }

    public ProducerFactory<Integer,String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @KafkaListener(id = "def1", topics = { D_REQUEST}, groupId = "D_REQUEST1")
    @SendTo  // default REPLY_TOPIC header
    public String dListener1(String in) throws InterruptedException {
        return "First Consumer : "+ in.toUpperCase();
    }

    @KafkaListener(id = "def2", topics = { D_REQUEST}, groupId = "D_REQUEST2")
    @SendTo  // default REPLY_TOPIC header
    public String dListener2(String in) throws InterruptedException {
        return "Second Consumer : "+ in.toLowerCase();
    }

    @KafkaListener(id = "def3", topics = { D_REQUEST}, groupId = "D_REQUEST3")
    @SendTo  // default REPLY_TOPIC header
    public String dListener3(String in) throws InterruptedException {
        Thread.sleep(11000);
        return "Third Consumer : "+ in;
    }

}
'''


  [1]: https://github.com/spring-projects/spring-kafka/blob/master/spring-kafka/src/test/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplateTests.java

1 Ответ

0 голосов
/ 31 марта 2020

template.setReturnPartialOnTimeout(true) просто означает, что шаблон будет использовать стратегию выпуска по таймауту (с аргументом timeout = true, чтобы сообщить стратегии, что это тайм-аут, а не вызов доставки).

Для получения частичного результата должно возвращаться значение true.

Это позволяет вам просмотреть (и, возможно, изменить) список, чтобы решить, хотите ли вы отменить или отменить.

Ваша стратегия игнорирует параметр timeout:

   (list, timeout) -> {
        releaseCount.incrementAndGet();
        return list.size() == releaseSize;
    });

Вам нужно return timeout ? true : { ... }.

...