Кафка - весенний облачный поток - PullRequest
0 голосов
/ 22 апреля 2020

Я пытаюсь использовать spring-cloud-stream с kafka. Ниже приведен пример кода. Но, похоже, ничего не делает. Он всегда создает topi c, называемый «output». Но значения не публикуются.

application.yaml

spring.cloud.stream:
  function:
    definition: streamSupplier
  bindings:
    streamSupplier-out-0:
      destination: numbers

Моя цель - просто создать значения.

@SpringBootApplication
@EnableBinding(Source.class)
public class CloudStreamDemoApplication {

    private AtomicInteger atomicInteger = new AtomicInteger();
    public static void main(String[] args) {
        SpringApplication.run(CloudStreamDemoApplication.class, args);
    }

    @Bean
    public Supplier<Integer> streamSupplier(){
        return () -> {
            System.out.println("Publishing : " + atomicInteger.incrementAndGet());
            return atomicInteger.get();
        };
    }
}

зависимость - 2.2.6.RELEASE

<dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-webflux</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream-binder-kafka</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>

Ответы [ 2 ]

1 голос
/ 22 апреля 2020

Вам необходимо удалить @EnableBinding(Source.class) из класса. Если это присутствует, функциональные привязки не будут иметь место.

0 голосов
/ 23 апреля 2020

Аннотация @EnableBinding вызвала проблему, как описано выше.

Прочитайте приведенные ниже выдержки из весенних документов:

Unlike previous versions of spring-cloud-stream which relied on @EnableBinding and @StreamListener annotations, the above example looks no different then any vanilla spring-boot application. It defines a single bean of type Function and that it is. So, how does it became spring-cloud-stream application? It becomes spring-cloud-stream application simply based on the presence of spring-cloud-stream and binder dependencies and auto-configuration classes on the classpath effectively setting the context for your boot application as spring-cloud-stream application. And in this context beans of type Supplier, Function or Consumer are treated as defacto message handlers triggering binding of to destinations exposed by the provided binder following certain naming conventions and rules to avoid extra configuration.

...