Использование Spring Kafka ReplyingKafkaTemplate с приложением Kafka Streams - PullRequest
0 голосов
/ 02 октября 2019

Я хотел бы иметь клиентское приложение с семантикой запрос / ответ, которое вызывает другое приложение, которое является приложением Kafka Streams.

Мое клиентское приложение основано на этом примере (практически без изменений),Мне нужно, чтобы приложение, получающее сообщения от клиента, было приложением Kafka Streams. Но заголовки сообщений, включая идентификатор корреляции, теряются.

Приложение Kafka Streams представляет собой простую топологию для тестирования этого ...

    @Bean
    public KafkaStreams stream(KafkaStreamsConfiguration kafkaStreamsConfiguration) {
        final StreamsBuilder builder = new StreamsBuilder();
        builder.<String, String>stream(REQUEST_TOPIC_NAME)
                .groupByKey()
                .count()
                .toStream()
                .mapValues((ValueMapper<Long, String>)String::valueOf)
                .to(REPLY_TOPIC_NAME);

        return new KafkaStreams(builder.build(), kafkaStreamsConfiguration.asProperties());
    }

Для этого POC я сохраняю это простым, и клиент и сервер "соглашаются" по темеимена (kRequests и kReplies). Поэтому на данный момент я просто хочу, чтобы идентификатор корреляции был распознан и возвращен.

Сейчас я вижу следующее:

2019-10-01 10:55:38.792  WARN 76830 --- [TaskScheduler-1] o.s.k.r.ReplyingKafkaTemplate            : Reply timed out for: ProducerRecord(topic=kRequests, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = kafka_replyTopic, value = [107, 82, 101, 112, 108, 105, 101, 115]), RecordHeader(key = kafka_correlationId, value = [101, -4, -35, 41, -127, -66, 69, 37, -117, -127, -95, -92, 38, 79, 73, 127])], isReadOnly = true), key=null, value=foo21074, timestamp=null) with correlationId: [135564972083657938538225367552235620735]
2019-10-01 10:55:38.792 ERROR 76830 --- [TaskScheduler-1] org.KRequestingApplication  : Reply timed out

org.springframework.kafka.KafkaException: Reply timed out
    at org.springframework.kafka.requestreply.ReplyingKafkaTemplate.lambda$scheduleTimeout$0(ReplyingKafkaTemplate.java:257) ~[spring-kafka-2.2.8.RELEASE.jar:2.2.8.RELEASE]
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) ~[spring-context-5.1.9.RELEASE.jar:5.1.9.RELEASE]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[na:1.8.0_211]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[na:1.8.0_211]
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) ~[na:1.8.0_211]
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) ~[na:1.8.0_211]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[na:1.8.0_211]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[na:1.8.0_211]
    at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_211]

Нет сообщения с соответствующим идентификатором корреляции натема ответа в течение тайм-аута. Похоже, что, по крайней мере, с использованием Kafka Streams DSL нет способа поддержать ReplyingKafkaTemplate.

1 Ответ

0 голосов
/ 02 октября 2019

Ваш сценарий не имеет смысла;ваш KStream группирует несколько входов;запрос / ответ 1 запрос 1 ответ.

Это прекрасно работает ...

@SpringBootApplication
@EnableKafkaStreams
public class So58193901Application {

    private static final String REQUEST_TOPIC_NAME = "requests";

    private static final String REPLY_TOPIC_NAME = "replies";

    public static void main(String[] args) {
        SpringApplication.run(So58193901Application.class, args);
    }

    @Bean
    public KStream<byte[], byte[]> stream(StreamsBuilder builder) {
        KStream<byte[], byte[]> stream = builder.stream(REQUEST_TOPIC_NAME);
        stream
                .mapValues(val -> new String(val).toUpperCase().getBytes())
                .to(REPLY_TOPIC_NAME);
        return stream;
    }

    @Bean
    public NewTopic topic1() {
        return TopicBuilder.name(REQUEST_TOPIC_NAME).partitions(1).replicas(1).build();
    }

    @Bean
    public NewTopic topic2() {
        return TopicBuilder.name(REPLY_TOPIC_NAME).partitions(1).replicas(1).build();
    }

    @Bean
    public ReplyingKafkaTemplate<String, String, String> template(ProducerFactory<String, String> pf,
            ConcurrentKafkaListenerContainerFactory<String, String> factory) {

        ConcurrentMessageListenerContainer<String, String> replyContainer = factory.createContainer(REPLY_TOPIC_NAME);
        return new ReplyingKafkaTemplate<>(pf, replyContainer);
    }

    @Bean
    public ApplicationRunner runner(ReplyingKafkaTemplate<String, String, String> template) {
        return args -> {
            System.out.println(template.sendAndReceive(new ProducerRecord<>(REQUEST_TOPIC_NAME, "bar", "foo"))
                    .get(10, TimeUnit.SECONDS).value());
        };
    }

}
...