Несколько @EnableBinding с Kafka Spring Cloud Stream - PullRequest
0 голосов
/ 04 февраля 2019

Я пытаюсь установить Spring Boot Application для прослушивания Kafka.

Я использую Kafka Streams Binder.

С одним простым @EnableBinding

@EnableBinding(StreamExample.StreamProcessor.class)
public class StreamExample {

    @StreamListener(StreamProcessor.INPUT)
    @SendTo(StreamProcessor.OUTPUT)
    public KStream<String, String> process(KStream<String, String> input) {

        logger.info("Stream listening");

        return input
                .peek(((key, value) -> logger.info("key = {} value = {}", key, value)));
    }

    interface StreamProcessor {

        String INPUT = "input_1";
        String OUTPUT = "output_1";

        @Input(INPUT)
        KStream<String, String> input();

        @Output(OUTPUT)
        KStream<String, String> outputProcessed();
    }
}

и в application.yml

spring:
  cloud:
    stream:
      kafka:
        streams:
          binder:
            brokers: localhost:29092
      bindings:
         input_1:
           destination: mytopic1
           group: readgroup
         output_1:
           destination: mytopic2
         input_2:
           destination: mytopic3
           group: readgroup
         output_2:
           destination: mytopic4
  application:
    name: stream_s1000_app

Все отлично работает.

Но если я пытаюсь добавить второй класс с другим связыванием, возникает следующая ошибка:

Следующие подписанные темы не назначаются ни одному из участников: [mytopic1]

Пример второй привязки:

@EnableBinding(StreamExampleBindingTwo.StreamProcessor.class)
public class StreamExampleBindingTwo {

    @StreamListener(StreamProcessor.INPUT)
    @SendTo(StreamProcessor.OUTPUT)
    public KStream<String, String> process(KStream<String, String> input) {

        logger.info("Stream listening binding two");

        return input
                .peek(((key, value) -> logger.info("key = {} value = {}", key, value)));
    }

    interface StreamProcessor {

        String INPUT = "input_2";
        String OUTPUT = "output_2";

        @Input(INPUT)
        KStream<String, String> input();

        @Output(OUTPUT)
        KStream<String, String> outputProcessed();
    }
}

Что мне не хватает?Не могу ли я использовать несколько тем ввода и несколько выводов в одном приложении?Есть что-то связанное с application.name?

Ответы [ 2 ]

0 голосов
/ 05 февраля 2019

Я только что попробовал приложение, и это сработало.Если у вас есть несколько процессоров в одном приложении, вы должны убедиться, что каждый процессор получает свой собственный идентификатор приложения.Ниже показано, как у меня есть 2 разных идентификатора приложения для обоих входов в application.yml.

. Я видел, как оба процессора регистрируются в консоли.Также видел сообщения на темы вывода.

@SpringBootApplication
@EnableBinding({So54522918Application.StreamProcessor1.class, So54522918Application.StreamProcessor2.class})
public class So54522918Application {

    public static void main(String[] args) {
        SpringApplication.run(So54522918Application.class, args);
    }

    @StreamListener(StreamProcessor1.INPUT)
    @SendTo(StreamProcessor1.OUTPUT)
    public KStream<String, String> process1(KStream<String, String> input) {

        System.out.println("Stream listening");

        return input
                .peek(((key, value) -> System.out.println("key = " + key +", value = " + value)));
    }

    @StreamListener(StreamProcessor2.INPUT)
    @SendTo(StreamProcessor2.OUTPUT)
    public KStream<String, String> process2(KStream<String, String> input) {

        System.out.println("Stream listening binding two");

        return input
                .peek(((key, value) -> System.out.println("key = " + key +", value = " + value)));
    }

    interface StreamProcessor1 {

        String INPUT = "input_1";
        String OUTPUT = "output_1";

        @Input(INPUT)
        KStream<String, String> input();

        @Output(OUTPUT)
        KStream<String, String> outputProcessed();
    }

    interface StreamProcessor2 {

        String INPUT = "input_2";
        String OUTPUT = "output_2";

        @Input(INPUT)
        KStream<String, String> input();

        @Output(OUTPUT)
        KStream<String, String> outputProcessed();
    }

}

Соответствующая часть application.yml

spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms: 1000
spring.cloud.stream.kafka.streams:
  binder.configuration:
    default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
    default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
  bindings.input_1.consumer.application-id: process-1
  bindings.input_2.consumer.application-id: process-2
spring.cloud.stream.bindings.input_1:
  destination: mytopic1
spring.cloud.stream.bindings.output_1:
  destination: mytopic2
spring.cloud.stream.bindings.input_2:
  destination: mytopic3
spring.cloud.stream.bindings.output_2:
  destination: mytopic4
0 голосов
/ 04 февраля 2019

Попробуйте

@EnableBinding( { StreamExample.StreamProcessor.class, StreamExampleBindingTwo.StreamProcessor.class })
...