У меня есть следующие свойства для моего приложения Spring Cloud Stream, которое использует Kafka Streams Binder:
spring.cloud.stream.bindings:
windowStream-in-0:
destination: input
windowStream-out-0:
destination: window1
hint1Stream-in-0:
destination: window1
hint1Stream-out-0:
destination: hints
realityStream-in-0:
destination: input
realityStream-in-1:
destination: window1
consumer:
timestampExtractorBeanName: anotherTimestampExtractor
realityStream-out-0:
destination: hints
countStream-in-0:
destination: hints
spring.cloud.stream.kafka.streams:
default:
consumer:
timestampExtractorBeanName: timestampExtractor
binder:
functions:
windowStream:
applicationId: mock-stream-window1
hint1Stream:
applicationId: mock-stream-hints
realityStream:
applicationId: mock-stream-reality
countStream:
applicationId: mock-stream-count
stateStoreRetry:
maxAttempts: 3
backOffInterval: 1000
configuration:
schema.registry.url: mock://mock-stream-registry
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
commit.interval.ms: 100
Я пытаюсь использовать «timestampExtractor» для всех потоков, кроме одного, который называется "realityStream".
Для этого я установил spring.cloud.stream.kafka.streams.default.consumer.timestampExtractorBeanName
на timestampExtractor
, а затем попытался «переопределить» его для «realityStream», установив spring.cloud.stream.bindings.realityStream-in-1.consumer.timestampExtractorBeanName
К сожалению, кажется что мое переопределение не работает, поскольку вызывается только «timestampExtractor», как я могу видеть в отладчике (и в результатах моих тестов).
Я неправильно применяю конфигурацию или имею неверное ожидание ?
Вот изображение потоков в моем единственном приложении Spring Cloud Streams:
(оранжевый кружок - это место, где я хотите применить средство извлечения временных меток не по умолчанию)