Функциональное программирование: определение бина и интеграция функции «весна-облако» + интеграция «весна-облако-поток» - PullRequest
0 голосов
/ 09 июня 2019

Привет всем и специально команде Spring!

Как я могу передать функцию spring-cloud с помощью cloud-cloud-stream в стиле функциональной модели программирования Bean?

Например, у меня есть pom.xml с обеими зависимостями:

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream-reactive</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-starter-function-webflux</artifactId>
</dependency>

и, скажем, я хотел бы сделать следующее:

  1. отправить через строку полезной нагрузки http через функцию spring-cloud (webflux)
  2. прописать его в верхнем регистре, используя мою функцию toUpperCase
  3. и, наконец, отправить в мой конвейер вперёд к установленному связующему (kafka / rabbit / test-binder)

, поэтому я ожидаю реализацииэто так:

@Log4j2
@SpringBootApplication
public class SpringCloudFunctionStreamApplication {

  /**
   * can I sent result of that function to my broker without any
   * explicitly defined output.send(...) execution?
   */
  @Bean
  public Function<String, String> toUpperCase() {
    return arg -> {
      var res = arg.toUpperCase();
      log.info("toUpperCase: {}", res);
      return res;
    };
  }

  public static void main(String[] args) {
    SpringApplication.run(
      SpringCloudFunctionStreamApplication.class,
      "--spring.cloud.function.definition=toUpperCase",
      "--spring.cloud.stream.function.definition=toUpperCase"
    );
  }
}

поэтому, когда я использую HTTPie для отправки полезной нагрузки, вот так:

echo 'hello' | http :8080/toUpperCase

spring-cloud-function, кажется, работает нормально, и я могусм. ожидаемый журнал:

2019-06-09 21:20:36.978 ...SpringCloudFunctionStreamApplication : toUpperCase: hello

То же самое, если я публикую сообщение через веб-интерфейс управления rabbitmq, но как я могу передавать данные от одного к другому

Так что мой вопрос связан с в соответствии с весенней документацией, в которой говорится, что я могу использовать spring-cloud-streamа также: Оболочки для @Beans типа Function, Consumer и Supplier, выставляющие их внешнему миру в качестве конечных точек HTTP и / или слушателей / издателей потока сообщений с RabbitMQ, Kafka и т. д., но я не могу понять, как?

На данный момент, к сожалению, я могу только вручную публиковать сообщения в связыватель spring-cloud-stream, используя Source , см. Пример здесь , но это, конечно, то, что я хочу знать, если можно избежатьс весной, волшебным образом ...

Может кто-нибудь сказать мне (может быть, Гари Рассел, Дейв Сойер, Артем Билан, Олег Жураковский или кто-нибудь еще, кто знает): что я пропустил и как я должен настроить свое приложение иликакие реквизиты я должен добавить в мои application.properties и т. д.?

Спасибо!


С уважением, Максим

Ответы [ 2 ]

0 голосов
/ 01 июля 2019

Это скорее вопрос к Олегу Жураковскому. Был бы рад если бы ответил

Если я использую @Bean Supplier<Pojo>... для привязки выходного пункта назначения, как вызывать его из класса @Service или @Controller каждый раз, когда новый Pojo отправляется в Kafka / Rabbit.

Supplier предоставляет только метод get().

Я пишу только производителю, который напишет пользовательский Pojo для Кафки, а другое приложение - Потребитель. Функциональный подход более понятен для Consumer<Pojo>..., где он будет просто читать из Кафки и обрабатывать. Supplier<Pojo>... часть для производителя не ясна.

https://www.youtube.com/watch?v=nui3hXzcbK0&t=3478s

0 голосов
/ 11 июня 2019

Максим

Это не идеально, но, учитывая, что мы реализовали начальную поддержку функций в рамках существующих связывателей, существуют некоторые ограничения. Я объясню, но сначала вот полностью функционирующий код:

@SpringBootApplication
public class SimpleFunctionRabbitDemoApplication  {

    public static void main(String[] args) throws Exception {
        SpringApplication.run(SimpleFunctionRabbitDemoApplication.class,
            "--spring.cloud.stream.function.definition=uppercase");
    }

    @Autowired
    private Processor processor;

    @Bean
    public Consumer<String> consume() {
        return v -> processor.input().send(MessageBuilder.withPayload(v).build());
    }

    @Bean
    public Function<String, String> uppercase() {
        return value -> value.toUpperCase();
    }
}

В основном, есть небольшое несоответствие. На стороне потока у нас есть связующие, а на стороне функции - адаптеры. Вы эффективно (с вашим требованием) пытаетесь соединить их в конвейер. Так. , ,

Давайте сначала посмотрим на связующие.

Функция в верхнем регистре связана с input и output каналами, предоставляемыми связывателями каналов сообщений (кролик или кафка), эффективно создавая внутренний конвейер input -> uppercase -> output. Она также отображается как конечная точка REST с помощью s-c-функции, однако s-c-функция не имеет доступа к упомянутому конвейеру. На самом деле у него есть собственный конвейер request -> uppercase -> reply. Итак, что нам нужно сделать, это соединить две концепции вместе, и именно это я и сделал.

  • Вы добавляете в свое приложение привязку Processor, которая содержит ссылки на каналы, с которыми связан uppercase.

  • Вы вызываете consume() через REST http://localhost:8080/consume/blah.

  • Вы отправляете сообщение на входной канал функции uppercase

В будущем, чтобы упростить это, нам просто нужно создать версию веб-адаптера в виде переплета, поэтому, пожалуйста, не стесняйтесь подавать запрос на функцию. Но, как вы видите, текущий обходной путь - это еще не все.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...