Параметр timestampExtractorBeanName в приложении Spring Cloud Stream не отменяет значение по умолчанию - PullRequest
0 голосов
/ 27 мая 2020

У меня есть следующие свойства для моего приложения Spring Cloud Stream, которое использует Kafka Streams Binder:

spring.cloud.stream.bindings:
  windowStream-in-0:
    destination: input
  windowStream-out-0:
    destination: window1
  hint1Stream-in-0:
    destination: window1
  hint1Stream-out-0:
    destination: hints
  realityStream-in-0:
    destination: input
  realityStream-in-1:
    destination: window1
    consumer:
      timestampExtractorBeanName: anotherTimestampExtractor
  realityStream-out-0:
    destination: hints
  countStream-in-0:
    destination: hints

spring.cloud.stream.kafka.streams:
  default:
    consumer:
      timestampExtractorBeanName: timestampExtractor
  binder:
    functions:
      windowStream:
        applicationId: mock-stream-window1
      hint1Stream:
        applicationId: mock-stream-hints
      realityStream:
        applicationId: mock-stream-reality
      countStream:
        applicationId: mock-stream-count
    stateStoreRetry:
      maxAttempts: 3
      backOffInterval: 1000
    configuration:
      schema.registry.url: mock://mock-stream-registry
      default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
      default.value.serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
      commit.interval.ms: 100

Я пытаюсь использовать «timestampExtractor» для всех потоков, кроме одного, который называется "realityStream".

Для этого я установил spring.cloud.stream.kafka.streams.default.consumer.timestampExtractorBeanName на timestampExtractor, а затем попытался «переопределить» его для «realityStream», установив spring.cloud.stream.bindings.realityStream-in-1.consumer.timestampExtractorBeanName

К сожалению, кажется что мое переопределение не работает, поскольку вызывается только «timestampExtractor», как я могу видеть в отладчике (и в результатах моих тестов).

Я неправильно применяю конфигурацию или имею неверное ожидание ?

Вот изображение потоков в моем единственном приложении Spring Cloud Streams:

enter image description here

(оранжевый кружок - это место, где я хотите применить средство извлечения временных меток не по умолчанию)

1 Ответ

2 голосов
/ 27 мая 2020

Переопределение не в том месте; он должен находиться под свойством ....kafka.streams.bindings.realityStream-in-1.....

Это свойство c, специфичное для кафки; он у вас есть в общих свойствах привязок c (общих для всех привязок).

...