Возможен ли весенний облачный поток KStream нескольких @StreamLisener? - PullRequest
1 голос
/ 13 марта 2019

Я использую Sprin облачную стрема Kstream.Я тестирую одну тему и одну @StreamListner.Это нормально.

Я изменяю свой код для двух входов KStream.(два @StreamListener) Но, ошибка весеннего облака ..


***************************
APPLICATION FAILED TO START
***************************

Description:

The bean 'stream-builder-process', defined in null, could not be registered. A bean with that name has already been defined in null and overriding is disabled.

Action:

Consider renaming one of the beans or enabling overriding by setting spring.main.allow-bean-definition-overriding=true


Process finished with exit code 1

Первый слушатель


    package com.kstream.spring.cloud.test1;

    import static com.kstream.spring.cloud.test1.MyBinding.TOPIC1_IN;

    import org.apache.avro.generic.GenericRecord;
    import org.apache.kafka.streams.kstream.KStream;
    import org.springframework.cloud.stream.annotation.Input;
    import org.springframework.cloud.stream.annotation.StreamListener;
    import org.springframework.stereotype.Component;

    @Component
    public class Topic1Source {

      @StreamListener
      public void process(@Input(TOPIC1_IN) KStream<String, GenericRecord> logs) {

        logs
            .foreach((key, value) -> {
              System.out.println("Test Topic1 : " + value);
            });
      }
    }

Только Первый слушатель в порядке.

Второй слушатель


    package com.kstream.spring.cloud.test1;

    import static com.kstream.spring.cloud.test1.MyBinding.TOPIC2_IN;

    import org.apache.avro.generic.GenericRecord;
    import org.apache.kafka.streams.kstream.KStream;
    import org.springframework.cloud.stream.annotation.Input;
    import org.springframework.cloud.stream.annotation.StreamListener;
    import org.springframework.stereotype.Component;

    @Component
    public class Topic2Source {

      @StreamListener
      public void process(@Input(TOPIC2_IN) KStream<String, GenericRecord> logs) {

        logs
            .foreach((key, value) -> {
              System.out.println("Test Topic2 : " + value);
            });
      }
    }

Но это ошибка

application.properties

spring.application.name=kafka-streams-test
spring.kafka.bootstrap-servers=my brokers

# defaults
spring.cloud.stream.kafka.streams.binder.brokers=my brokers
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000
spring.cloud.stream.kafka.streams.binder.configuration.schema.registry.url=my server


# topic1
spring.cloud.stream.bindings.topic1In.destination=topic1
spring.cloud.stream.bindings.topic1In.consumer.useNativeDecoding=true
spring.cloud.stream.bindings.topic1In.consumer.header-mode=raw
spring.cloud.stream.kafka.streams.bindings.topic1In.consumer.keySerde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.bindings.topic1In.consumer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde




# topic2
spring.cloud.stream.bindings.topic2In.destination=topic2
spring.cloud.stream.bindings.topic2In.consumer.useNativeDecoding=true
spring.cloud.stream.bindings.topic2In.consumer.header-mode=raw
spring.cloud.stream.kafka.streams.bindings.topic2In.consumer.keySerde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.bindings.topic2In.consumer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde

Ответы [ 4 ]

1 голос
/ 25 марта 2019

Я нахожу причину ошибки.Потому что я определил два одинаковых имени метода 'process'.

0 голосов
/ 14 марта 2019

Я изменяю версию pom.xml.Работает.Но эта версия не использует свойства идентификатора приложения.

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <!--<version>2.1.3.RELEASE</version>-->
        <version>2.0.1.BUILD-SNAPSHOT</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>


    <properties>
        <java.version>1.8</java.version>
        <!--<spring-cloud.version>Greenwich.SR1</spring-cloud.version>-->
        <spring-cloud.version>Finchley.BUILD-SNAPSHOT</spring-cloud.version>
    </properties>

spring-boot-starter-parent версия

  • 2.1.3.RELEASE => Ошибка
  • 2.0.1.BUILD-SNAPSHOT => Работающий

spring-cloud.version

  • Greenwich.SR1 => Ошибка
  • Finchley.BUILD-SNAPSHOT => Работает

Я не понимаю, почему не работает последняя версия.

0 голосов
/ 19 марта 2019

Я думаю, вы могли бы попробовать изменить "logs" на "logs2" для второго слушателя.

Спасибо, Du

0 голосов
/ 13 марта 2019

Вам необходимо предоставить отдельные идентификаторы приложения для обоих входов.См. этот вопрос и ответ.

...