Spring Cloud Stream генерирует ненужные сложные топологии Кафки, почему? - PullRequest
0 голосов
/ 01 мая 2019

У меня есть приложение KStream с кучей KStreams, объединений и других операций. Я включил logging.level.org.springframework.kafka.config=debug, чтобы проверить генерируемую топологию, и обнаружил множество узлов, которые вообще не имели смысла.

Тогда я упростил приложение до следующего:

interface ShippingKStreamProcessor {

    @Input("input")
    fun input(): KStream<Int, Customer>

}

@Suppress("UNCHECKED_CAST")
@Configuration
class ShippingKStreamConfiguration {

    @StreamListener
    fun process(@Input("input") input: KStream<Int, Customer> {}

}

Как ни странно, такое простое объявление KStream генерирует эту сложную топологию:

2019-04-30 23:47:03.881 DEBUG 2944 --- [           main] o.s.k.config.StreamsBuilderFactoryBean   : Topologies:
   Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000 (topics: [customer])
      --> KSTREAM-MAPVALUES-0000000001
    Processor: KSTREAM-MAPVALUES-0000000001 (stores: [])
      --> KSTREAM-BRANCH-0000000003, KSTREAM-PROCESSOR-0000000002
      <-- KSTREAM-SOURCE-0000000000
    Processor: KSTREAM-BRANCH-0000000003 (stores: [])
      --> KSTREAM-BRANCHCHILD-0000000004, KSTREAM-BRANCHCHILD-0000000005
      <-- KSTREAM-MAPVALUES-0000000001
    Processor: KSTREAM-BRANCHCHILD-0000000004 (stores: [])
      --> KSTREAM-MAPVALUES-0000000007
      <-- KSTREAM-BRANCH-0000000003
    Processor: KSTREAM-BRANCHCHILD-0000000005 (stores: [])
      --> KSTREAM-PROCESSOR-0000000006
      <-- KSTREAM-BRANCH-0000000003
    Processor: KSTREAM-MAPVALUES-0000000007 (stores: [])
      --> none
      <-- KSTREAM-BRANCHCHILD-0000000004
    Processor: KSTREAM-PROCESSOR-0000000002 (stores: [])
      --> none
      <-- KSTREAM-MAPVALUES-0000000001
    Processor: KSTREAM-PROCESSOR-0000000006 (stores: [])
      --> none
      <-- KSTREAM-BRANCHCHILD-0000000005

enter image description here

Тот же простой поток в нативном приложении Kafka приводит к более логичной топологии:

fun main(args: Array<String>) {

    val builder = StreamsBuilder()

    val streamsConfiguration = Properties()
    streamsConfiguration[StreamsConfig.APPLICATION_ID_CONFIG] = "kafka-shipping-service"
    streamsConfiguration[StreamsConfig.BOOTSTRAP_SERVERS_CONFIG] = "http://localhost:9092"
    streamsConfiguration[AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG] = "http://localhost:8081"

    val serdeConfig = mapOf(
        AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG to "http://localhost:8081",
        AbstractKafkaAvroSerDeConfig.VALUE_SUBJECT_NAME_STRATEGY to TopicRecordNameStrategy::class.java.name
    )

    //val byteArraySerde = Serdes.ByteArray()
    val intSerde = Serdes.IntegerSerde()
    val customerSerde = SpecificAvroSerde<Customer>()
    customerSerde.configure(serdeConfig, false)

    val customerStream = builder.stream<Int, Customer>("customer",
        Consumed.with(intSerde, customerSerde)) as KStream<Int, Customer>

    val topology = builder.build()
    println(topology.describe())

    val streams = KafkaStreams(topology, streamsConfiguration)
    streams.start()
}

Топология:

Topologies:
   Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000 (topics: [customer])
      --> none

enter image description here

В чем причина, по которой Spring Cloud Stream генерирует такую ​​сложную топологию?

1 Ответ

1 голос
/ 02 мая 2019

@ codependent Причина, по которой у вас есть эти дополнительные процессоры в топологии, заключается в том, что вы используете de / serailzers, предоставляемые платформой (для встроенного декодирования и кодирования по умолчанию установлено значение false).По сути, мы получаем данные из темы Кафки как byte[], а затем выполняем внутренние преобразования.Для этих преобразований мы пройдем несколько дополнительных процессоров, и, таким образом, вы получите более глубокую топологию.

Вот базовый StreamListener в Java (в значительной степени то, что у вас там есть, но с использованием более простого типа значения):

@StreamListener
public void process(@Input("input") KStream<Integer, String> input ) {

}

Со стандартной стандартной настройкой вСвязыватель, я смог получить ту же более глубокую топологию, что вы наблюдали.Однако, когда я изменил конфигурацию приложения, как показано ниже,

spring.cloud.stream.kafka.streams:
  binder.configuration:
    default.key.serde: org.apache.kafka.common.serialization.Serdes$IntegerSerde
    default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.bindings.input.consumer.useNativeDecoding: true

моя топология уменьшится, как показано ниже:

2019-05-01 18:02:12.705 DEBUG 67539 --- [           main] o.s.k.config.StreamsBuilderFactoryBean   : Topologies:
   Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000 (topics: [hello-1])
      --> KSTREAM-MAPVALUES-0000000001
    Processor: KSTREAM-MAPVALUES-0000000001 (stores: [])
      --> none
      <-- KSTREAM-SOURCE-0000000000

Это все равно не то же самое, что топология, полученная вами изпростое приложение Kafka Streams, но, оказывается, это то, что мы можем улучшить в связующем, чтобы избежать.Короче говоря, переключаясь на собственное декодирование и кодирование, предоставляемое Kafka Streams, вы можете избежать всех этих дополнительных уровней топологий, создаваемых связующим.

В некоторых случаях у вас нет выбора, но вы полагаетесь на десериализацию, предоставляемую Spring Cloud Stream, например, вы получаете данные от производителя, основанного на Spring Cloud Stream, который использовал некоторые специальные сериализаторы,Я думаю, что это верно в вашем случае, поскольку, насколько я помню, ваш производитель основан на Spring Cloud Stream и использует сериализатор Avro, предоставляемый фреймворком.В этом случае использование Avro Serde Kafka Stream в вашем процессоре не будет работать, поскольку эти сериализаторы несовместимы.Итак, вот некоторые из ваших вариантов.

Approcah # 1:

  1. Заставьте своих производителей использовать собственные сериализаторы, предоставленные Kafka.
  2. Затем используйте Serde, которые используют то же самоеСериализатор / десериализатор в вашем приложении Kafka Streams.

Подход № 2:

  1. Используйте сериализаторы сообщений, предоставленные SCSt.
  2. Затем используйте де / по умолчаниюСериализация, предоставляемая механизмом связывания Kafka Streams, который используется по умолчанию.

Недостатком # 2, очевидно, является то, что вы подняли выше, то есть более глубокие топологии.Это может быть хорошо в зависимости от ваших вариантов использования и пропускной способности.Если это станет реальной проблемой производительности, мы можем попытаться упростить этот процесс, когда преобразование будет выполнено платформой.

С учетом всего сказанного я создал проблему в связывателе Кафки, чтобы сделатьизменение в следующем выпуске связующего.Ваши отзывы, предложения, голоса "за" и "против" приветствуются там.

...