Я хотел бы иметь клиентское приложение с семантикой запрос / ответ, которое вызывает другое приложение, которое является приложением Kafka Streams.
Мое клиентское приложение основано на этом примере (практически без изменений),Мне нужно, чтобы приложение, получающее сообщения от клиента, было приложением Kafka Streams. Но заголовки сообщений, включая идентификатор корреляции, теряются.
Приложение Kafka Streams представляет собой простую топологию для тестирования этого ...
@Bean
public KafkaStreams stream(KafkaStreamsConfiguration kafkaStreamsConfiguration) {
final StreamsBuilder builder = new StreamsBuilder();
builder.<String, String>stream(REQUEST_TOPIC_NAME)
.groupByKey()
.count()
.toStream()
.mapValues((ValueMapper<Long, String>)String::valueOf)
.to(REPLY_TOPIC_NAME);
return new KafkaStreams(builder.build(), kafkaStreamsConfiguration.asProperties());
}
Для этого POC я сохраняю это простым, и клиент и сервер "соглашаются" по темеимена (kRequests
и kReplies
). Поэтому на данный момент я просто хочу, чтобы идентификатор корреляции был распознан и возвращен.
Сейчас я вижу следующее:
2019-10-01 10:55:38.792 WARN 76830 --- [TaskScheduler-1] o.s.k.r.ReplyingKafkaTemplate : Reply timed out for: ProducerRecord(topic=kRequests, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = kafka_replyTopic, value = [107, 82, 101, 112, 108, 105, 101, 115]), RecordHeader(key = kafka_correlationId, value = [101, -4, -35, 41, -127, -66, 69, 37, -117, -127, -95, -92, 38, 79, 73, 127])], isReadOnly = true), key=null, value=foo21074, timestamp=null) with correlationId: [135564972083657938538225367552235620735]
2019-10-01 10:55:38.792 ERROR 76830 --- [TaskScheduler-1] org.KRequestingApplication : Reply timed out
org.springframework.kafka.KafkaException: Reply timed out
at org.springframework.kafka.requestreply.ReplyingKafkaTemplate.lambda$scheduleTimeout$0(ReplyingKafkaTemplate.java:257) ~[spring-kafka-2.2.8.RELEASE.jar:2.2.8.RELEASE]
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) ~[spring-context-5.1.9.RELEASE.jar:5.1.9.RELEASE]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[na:1.8.0_211]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[na:1.8.0_211]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) ~[na:1.8.0_211]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) ~[na:1.8.0_211]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[na:1.8.0_211]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[na:1.8.0_211]
at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_211]
Нет сообщения с соответствующим идентификатором корреляции натема ответа в течение тайм-аута. Похоже, что, по крайней мере, с использованием Kafka Streams DSL нет способа поддержать ReplyingKafkaTemplate
.