Промежуточные темы в синхронной кафке: использование Spring Request-Reply - PullRequest
0 голосов
/ 11 апреля 2020

В последнем выпуске spring-kakfa мы пытаемся использовать семантику запрос-ответ и хотели бы знать, можем ли мы использовать промежуточные темы без потери идентификатора корреляции. Одним из наших вариантов использования является получение сообщения от API, создание его для topic1, а также результата для topic2, обработка сообщения в topic2 и отправка его в topic3, где topic3 ​​отправляет окончательный ответ на первоначальный запрос из topic1.

Я не могу связать ответ, полученный от topic3, с запросом на theme1, так как корреляционный идентификатор теряется в промежуточных топиках c. Я могу получить сообщение, если я не использую промежуточный topi c (скажем topic2), затем topic1 отправляет сообщение с correlationid, и соответствующий ответ получен от topic3.

Любые предложения / рекомендации очень полезны.

Ниже приведен пример кода: из моего API я публикую транзакцию

public String postTransaction(String request,Map<String, String> headers) throws InterruptedException, ExecutionException {
    ProducerRecord<String,String> record=new ProducerRecord<String,String>(topic1,"300",request);
    record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC,topic3.getBytes()));
    RequestReplyFuture<String,String,String> sendAndReceive=kafkaTemplate.sendAndReceive(record);
    SendResult<String,String> requestMessage=sendAndReceive.getSendFuture().get();
    return sendAndReceive.get().value();
}
#

В другом потребителе я слушаю на topic1 и взяв идентификатор корреляции и создав сообщение на topic2, которое отправит ответ на theme3.

public void listen(Object request, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) List<Integer> keys,
                   @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
                   @Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
                   @Header(KafkaHeaders.OFFSET) List<Long> offsets,
                   @Header(KafkaHeaders.CORRELATION_ID) byte[] coRlId) throws InterruptedException {

    ProducerRecord<String,String> record=new ProducerRecord<String,String>("topic2","300",k.value());
    record.headers().add(new RecordHeader(KafkaHeaders.CORRELATION_ID,coRlId.get(0).getBytes()));

    kafkaTemplate.send(record);

}

1 Ответ

0 голосов
/ 14 апреля 2020

Я только что проверил это, и он прекрасно работает для меня ...

@SpringBootApplication
public class So61152047Application {

    public static void main(String[] args) {
        SpringApplication.run(So61152047Application.class, args);
    }

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Bean
    public ReplyingKafkaTemplate<String, String, String> replyer(ProducerFactory<String, String> pf,
            ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {

        containerFactory.setReplyTemplate(kafkaTemplate(pf));
        ConcurrentMessageListenerContainer<String, String> container = replyContainer(containerFactory);
        ReplyingKafkaTemplate<String, String, String> replyer = new ReplyingKafkaTemplate<>(pf, container);
        return replyer;
    }

    @Bean
    public ConcurrentMessageListenerContainer<String, String> replyContainer(
            ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {

        ConcurrentMessageListenerContainer<String, String> container = containerFactory.createContainer("topic3");
        container.getContainerProperties().setGroupId("three");
        return container;
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> pf) {
        return new KafkaTemplate<>(pf);
    }


    @Bean
    public ApplicationRunner runner(ReplyingKafkaTemplate<String, String, String> rkt) {
        return args -> {
            ProducerRecord<String, String> pr = new ProducerRecord<>("topic1", "foo", "bar");
            RequestReplyFuture<String, String, String> future = rkt.sendAndReceive(pr);
            System.out.println(future.get(10, TimeUnit.SECONDS).value());
        };
    }

    @Bean
    public NewTopic topic1() {
        return TopicBuilder.name("topic1").partitions(1).replicas(1).build();
    }

    @Bean
    public NewTopic topic2() {
        return TopicBuilder.name("topic2").partitions(1).replicas(1).build();
    }

    @Bean
    public NewTopic topic3() {
        return TopicBuilder.name("topic3").partitions(1).replicas(1).build();
    }

    @KafkaListener(id = "one", topics = "topic1")
    public void listen1(String in,
            @Header(KafkaHeaders.CORRELATION_ID) byte[] corrId) {

        System.out.println(in);
        ProducerRecord<String, String> pr = new ProducerRecord<>("topic2", in.toUpperCase());
        pr.headers().add(new RecordHeader(KafkaHeaders.CORRELATION_ID, corrId));
        this.kafkaTemplate.send(pr);
    }

    @KafkaListener(id = "two", topics = "topic2")
    @SendTo("topic3")
    public String listen2(String in) {
        return in + in;
    }

}

и

bar
BARBAR

Вы также можете распространять заголовок (и) ответа на сообщение .. .

    @KafkaListener(id = "one", topics = "topic1")
    public void listen1(String in,
            @Header(KafkaHeaders.REPLY_TOPIC) byte[] replyTo,
            @Header(KafkaHeaders.CORRELATION_ID) byte[] corrId) {

        System.out.println(in);
        ProducerRecord<String, String> pr = new ProducerRecord<>("topic2", in.toUpperCase());
        pr.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, replyTo));
        pr.headers().add(new RecordHeader(KafkaHeaders.CORRELATION_ID, corrId));
        this.kafkaTemplate.send(pr);
    }

    @KafkaListener(id = "two", topics = "topic2")
    @SendTo // ("topic3")
    public String listen2(String in) {
        return in + in;
    }

РЕДАКТИРОВАТЬ

Для передачи идентификатора корреляции через полезную нагрузку:

public class CorrelatingProducerInterceptor implements ProducerInterceptor<String, Foo> {

    @Override
    public void configure(Map<String, ?> configs) {
    }

    @Override
    public ProducerRecord<String, Foo> onSend(ProducerRecord<String, Foo> record) {
        Header correlation = record.headers().lastHeader(KafkaHeaders.CORRELATION_ID);
        if (correlation != null) {
            record.value().setCorrelation(correlation.value());
        }
        return record;
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
    }

    @Override
    public void close() {
    }

}
@SpringBootApplication
public class So61152047Application {

    public static void main(String[] args) {
        SpringApplication.run(So61152047Application.class, args);
    }

    @Autowired
    private KafkaTemplate<String, Foo> kafkaTemplate;

    @Bean
    public ReplyingKafkaTemplate<String, Foo, Foo> replyer(ProducerFactory<String, Foo> pf,
            ConcurrentKafkaListenerContainerFactory<String, Foo> containerFactory) {

        containerFactory.setReplyTemplate(kafkaTemplate(pf));
        ConcurrentMessageListenerContainer<String, Foo> container = replyContainer(containerFactory);
        ReplyingKafkaTemplate<String, Foo, Foo> replyer = new ReplyingKafkaTemplate<>(pf, container);
        return replyer;
    }

    @Bean
    public ConcurrentMessageListenerContainer<String, Foo> replyContainer(
            ConcurrentKafkaListenerContainerFactory<String, Foo> containerFactory) {

        ConcurrentMessageListenerContainer<String, Foo> container = containerFactory.createContainer("topic3");
        container.getContainerProperties().setGroupId("three");
        return container;
    }

    @Bean
    public KafkaTemplate<String, Foo> kafkaTemplate(ProducerFactory<String, Foo> pf) {
        return new KafkaTemplate<>(pf);
    }

    @Bean
    public ApplicationRunner runner(ReplyingKafkaTemplate<String, Foo, Foo> rkt) {
        return args -> {
            ProducerRecord<String, Foo> pr = new ProducerRecord<>("topic1", "foo", new Foo("bar"));
            RequestReplyFuture<String, Foo, Foo> future = rkt.sendAndReceive(pr);
            System.out.println(future.get(10, TimeUnit.SECONDS).value());
        };
    }

    @Bean
    public NewTopic topic1() {
        return TopicBuilder.name("topic1").partitions(1).replicas(1).build();
    }

    @Bean
    public NewTopic topic2() {
        return TopicBuilder.name("topic2").partitions(1).replicas(1).build();
    }

    @Bean
    public NewTopic topic3() {
        return TopicBuilder.name("topic3").partitions(1).replicas(1).build();
    }

    @KafkaListener(id = "one", topics = "topic1")
    public void listen1(Foo in) {

        System.out.println(in);
        in.setContent(in.getContent().toUpperCase());
        ProducerRecord<String, Foo> pr = new ProducerRecord<>("topic2", in);
        this.kafkaTemplate.send(pr);
    }

    @KafkaListener(id = "two", topics = "topic2")
    public void listen2(Foo in) {
        ProducerRecord<String, Foo> pr = new ProducerRecord<>("topic3", new Foo(in.getContent() + in.getContent()));
        pr.headers().add(new RecordHeader(KafkaHeaders.CORRELATION_ID, in.getCorrelation()));
        this.kafkaTemplate.send(pr);
    }

}

class Foo {

    String content;

    byte[] correlation;

    public Foo() {
    }

    public Foo(String content) {
        this.content = content;
    }

    public String getContent() {
        return this.content;
    }

    public void setContent(String content) {
        this.content = content;
    }

    public byte[] getCorrelation() {
        return this.correlation;
    }

    public void setCorrelation(byte[] correlation) {
        this.correlation = correlation;
    }

    @Override
    public String toString() {
        return "Foo [content=" + this.content + "]";
    }

}
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.value.default.type=com.example.demo.Foo

spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties.interceptor.classes=com.example.demo.CorrelatingProducerInterceptor

и

Foo [content=bar]
Foo [content=BARBAR]

Конечно, промежуточное приложение должно передавать идентификатор корреляции, даже если он находится в полезной нагрузке.

...