Как подключить функциональные компоненты Spring Cloud Stream к Binder Kafka? - PullRequest
0 голосов
/ 25 марта 2020

Я использую Spring Cloud Streams Documentation , чтобы попытаться выяснить, как подключить мой микро-сервис к Kafka через связыватель, уже загруженный в Gradle. Я попытался создать простой метод @Bean Function<String, String>() в своем классе Spring Boot Application и убедился, что он может общаться с Kafka, используя командную строку для взаимодействия с темами uppercase-in-0 и uppercase-out-0, как описано в начало документации, подтверждающей, что приложение способно общаться с Кафкой. В этот момент я попытался создать следующий класс с ожиданием загрузки через автообнаружение:

package com.yuknis.loggingconsumer;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class LoggingConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(LoggingConsumerApplication.class, args);
    }

}

package com.yuknis.loggingconsumer.functions;

import java.util.function.Function;

public class CharCounter implements Function<String, Integer> {

    /**
     * Applies this function to the given argument.
     *
     * @param s the function argument
     * @return the function result
     */
    @Override
    public Integer apply(String s) {
        return s.length();
    }

}

С файлами application.properties как таковыми:

spring.cloud.function.scan.packages:com.yuknis.loggingconsumer.functions

Я не уверен на 100%, что должно произойти сейчас, но я предполагаю, что он должен увидеть класс и автоматически создать charcounter-out-0 и charcounter-in-0 topi c, которые я мог бы использовать и опубликовать sh, с данными в этих темах, проходящих через эту функцию. Это не то, что происходит. Чего мне не хватает? Предполагается ли, что этот класс создает топи c так же, как @Bean?

1 Ответ

0 голосов
/ 25 марта 2020

Несмотря на то, что каждая из функций загружена с spring.cloud.function.scan.packages, установленным на пакет, и spring.cloud.function.scan.enabled, установленным на true, она по-прежнему не создает темы. Вам все еще нужно будет установить spring.cloud.function.scan.definition на Function, Consumer или Supplier, с которым вы хотите общаться с Кафкой, например:

spring.cloud.function:
  scan:
    enabled: true
    packages: com.yuknis.loggingconsumer.functions
  definition: charCounter;lowercase;uppercase

После этого он будет создайте темы charCounter-in-0 и charCounter-out-0, которые при необходимости можно сопоставить с помощью свойства выражения spring.cloud.function.charCounter-in-0 или spring.cloud.function.charCounter-out-0.

...