@ codependent Причина, по которой у вас есть эти дополнительные процессоры в топологии, заключается в том, что вы используете de / serailzers, предоставляемые платформой (для встроенного декодирования и кодирования по умолчанию установлено значение false
).По сути, мы получаем данные из темы Кафки как byte[]
, а затем выполняем внутренние преобразования.Для этих преобразований мы пройдем несколько дополнительных процессоров, и, таким образом, вы получите более глубокую топологию.
Вот базовый StreamListener
в Java (в значительной степени то, что у вас там есть, но с использованием более простого типа значения):
@StreamListener
public void process(@Input("input") KStream<Integer, String> input ) {
}
Со стандартной стандартной настройкой вСвязыватель, я смог получить ту же более глубокую топологию, что вы наблюдали.Однако, когда я изменил конфигурацию приложения, как показано ниже,
spring.cloud.stream.kafka.streams:
binder.configuration:
default.key.serde: org.apache.kafka.common.serialization.Serdes$IntegerSerde
default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.bindings.input.consumer.useNativeDecoding: true
моя топология уменьшится, как показано ниже:
2019-05-01 18:02:12.705 DEBUG 67539 --- [ main] o.s.k.config.StreamsBuilderFactoryBean : Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [hello-1])
--> KSTREAM-MAPVALUES-0000000001
Processor: KSTREAM-MAPVALUES-0000000001 (stores: [])
--> none
<-- KSTREAM-SOURCE-0000000000
Это все равно не то же самое, что топология, полученная вами изпростое приложение Kafka Streams, но, оказывается, это то, что мы можем улучшить в связующем, чтобы избежать.Короче говоря, переключаясь на собственное декодирование и кодирование, предоставляемое Kafka Streams, вы можете избежать всех этих дополнительных уровней топологий, создаваемых связующим.
В некоторых случаях у вас нет выбора, но вы полагаетесь на десериализацию, предоставляемую Spring Cloud Stream, например, вы получаете данные от производителя, основанного на Spring Cloud Stream, который использовал некоторые специальные сериализаторы,Я думаю, что это верно в вашем случае, поскольку, насколько я помню, ваш производитель основан на Spring Cloud Stream и использует сериализатор Avro, предоставляемый фреймворком.В этом случае использование Avro Serde
Kafka Stream в вашем процессоре не будет работать, поскольку эти сериализаторы несовместимы.Итак, вот некоторые из ваших вариантов.
Approcah # 1:
- Заставьте своих производителей использовать собственные сериализаторы, предоставленные Kafka.
- Затем используйте Serde, которые используют то же самоеСериализатор / десериализатор в вашем приложении Kafka Streams.
Подход № 2:
- Используйте сериализаторы сообщений, предоставленные SCSt.
- Затем используйте де / по умолчаниюСериализация, предоставляемая механизмом связывания Kafka Streams, который используется по умолчанию.
Недостатком # 2, очевидно, является то, что вы подняли выше, то есть более глубокие топологии.Это может быть хорошо в зависимости от ваших вариантов использования и пропускной способности.Если это станет реальной проблемой производительности, мы можем попытаться упростить этот процесс, когда преобразование будет выполнено платформой.
С учетом всего сказанного я создал проблему в связывателе Кафки, чтобы сделатьизменение в следующем выпуске связующего.Ваши отзывы, предложения, голоса "за" и "против" приветствуются там.