маршрут ОТ и маршрут КО с весенним облачным потоком и функциями - PullRequest
0 голосов
/ 27 февраля 2020

У меня есть некоторые проблемы с новой функцией маршрутизации в весеннем облачном потоке

Я попытался реализовать простой сценарий, я хочу отправить сообщение с заголовком spring.cloud.function.definition = потребление1 или потребление2

Я ожидаю, что require1 или потребление2 должны вызываться на основе того, что отправлено в заголовке, но методы вызываются случайным образом.

Я отправляю сообщение получателю обмена с помощью административной консоли кролика

У меня есть следующие журналы:

2020-02-27 14:48:25.896  INFO 22132 --- [ consumer.app-1] com.example.demo.TestConsumer            : ==============>consume1 messge [[payload=ok, headers={amqp_receivedDeliveryMode=NON_PERSISTENT, amqp_receivedRoutingKey=#, amqp_receivedExchange=consumer, amqp_deliveryTag=1, deliveryAttempt=1, amqp_consumerQueue=consumer.app, amqp_redelivered=false, id=9a4dff25-88ef-4d76-93e2-c8719cda122d, spring.cloud.function.definition=consume1, amqp_consumerTag=amq.ctag-gGChFNCKIVd25yyR9H6-fQ, sourceData=(Body:'[B@3a92faa7(byte[2])' MessageProperties [headers={spring.cloud.function.definition=consume1}, contentLength=0, receivedDeliveryMode=NON_PERSISTENT, redelivered=false, receivedExchange=consumer, receivedRoutingKey=#, deliveryTag=1, consumerTag=amq.ctag-gGChFNCKIVd25yyR9H6-fQ, consumerQueue=consumer.app]), timestamp=1582811303347}]]
2020-02-27 14:48:25.984  INFO 22132 --- [nio-8080-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring DispatcherServlet 'dispatcherServlet'
2020-02-27 14:48:25.984  INFO 22132 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet        : Initializing Servlet 'dispatcherServlet'
2020-02-27 14:48:25.991  INFO 22132 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet        : Completed initialization in 7 ms
2020-02-27 14:48:26.037  INFO 22132 --- [oundedElastic-1] o.s.i.monitor.IntegrationMBeanExporter   : Registering MessageChannel customer-1
2020-02-27 14:48:26.111  INFO 22132 --- [oundedElastic-1] o.s.c.s.m.DirectWithAttributesChannel    : Channel 'application.customer-1' has 1 subscriber(s).
2020-02-27 14:48:26.116  INFO 22132 --- [oundedElastic-1] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [localhost:5672]
2020-02-27 14:48:26.123  INFO 22132 --- [oundedElastic-1] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory.publisher#32438e24:0/SimpleConnection@3e58666d [delegate=amqp://guest@127.0.0.1:5672/, localPort= 62514]
2020-02-27 14:48:26.139  INFO 22132 --- [-1.customer-1-1] o.s.i.h.s.MessagingMethodInvokerHelper   : Overriding default instance of MessageHandlerMethodFactory with provided one.
2020-02-27 14:48:26.140  INFO 22132 --- [-1.customer-1-1] com.example.demo.TestSink                : Data received customer-1...body
2020-02-27 14:49:14.185  INFO 22132 --- [ consumer.app-1] o.s.i.h.s.MessagingMethodInvokerHelper   : Overriding default instance of MessageHandlerMethodFactory with provided one.
2020-02-27 14:49:14.194  INFO 22132 --- [ consumer.app-1] com.example.demo.TestConsumer            : ==============>consume2 messge [[payload=ok, headers={amqp_receivedDeliveryMode=NON_PERSISTENT, amqp_receivedRoutingKey=#, amqp_receivedExchange=consumer, amqp_deliveryTag=1, deliveryAttempt=1, amqp_consumerQueue=consumer.app, amqp_redelivered=false, id=33581edb-2832-1c92-b765-a05794512b34, spring.cloud.function.definition=consume1, amqp_consumerTag=amq.ctag-RIp2nZdcG2a0hNQeImwtBw, sourceData=(Body:'[B@8159793(byte[2])' MessageProperties [headers={spring.cloud.function.definition=consume1}, contentLength=0, receivedDeliveryMode=NON_PERSISTENT, redelivered=false, receivedExchange=consumer, receivedRoutingKey=#, deliveryTag=1, consumerTag=amq.ctag-RIp2nZdcG2a0hNQeImwtBw, consumerQueue=consumer.app]), timestamp=1582811354186}]]
2020-02-27 14:49:14.203  INFO 22132 --- [oundedElastic-1] o.s.i.monitor.IntegrationMBeanExporter   : Registering MessageChannel customer-2
2020-02-27 14:49:14.213  INFO 22132 --- [oundedElastic-1] o.s.c.s.m.DirectWithAttributesChannel    : Channel 'application.customer-2' has 1 subscriber(s).
2020-02-27 14:49:14.216  INFO 22132 --- [-2.customer-2-1] o.s.i.h.s.MessagingMethodInvokerHelper   : Overriding default instance of MessageHandlerMethodFactory with provided one.
2020-02-27 14:49:14.216  INFO 22132 --- [-2.customer-2-1] com.example.demo.TestSink                : Data received customer-2...body

application.yml

spring:
  main:
    allow-bean-definition-overriding: true
spring.cloud.stream:
  function.definition: supplier;receive1;receive2;consume1;consume2
  function.routing:
    enabled: true

  bindings:
    consume1-in-0.destination: consumer
    consume1-in-0.group: app
    consume2-in-0.destination: consumer
    consume2-in-0.group: app
    receive1-in-0.destination: customer-1
    receive1-in-0.group: customer-1
    receive2-in-0.destination: customer-2
    receive2-in-0.group: customer-2

DemoApplication. java

import com.fasterxml.jackson.databind.ObjectMapper
import org.apache.commons.logging.Log
import org.apache.commons.logging.LogFactory
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication
import org.springframework.context.annotation.Bean
import org.springframework.http.HttpStatus
import org.springframework.messaging.Message
import org.springframework.messaging.support.MessageBuilder
import org.springframework.stereotype.Component
import org.springframework.web.bind.annotation.PathVariable
import org.springframework.web.bind.annotation.RequestMapping
import org.springframework.web.bind.annotation.RequestMethod.GET
import org.springframework.web.bind.annotation.ResponseStatus
import org.springframework.web.bind.annotation.RestController
import org.springframework.web.client.RestTemplate
import reactor.core.publisher.EmitterProcessor
import reactor.core.publisher.Flux
import java.util.function.Consumer
import java.util.function.Supplier


@SpringBootApplication
class DemoApplication

fun main(args: Array<String>) {
    runApplication<DemoApplication>(*args)
}

@RestController
class DynamicDestinationController(private val jsonMapper: ObjectMapper) {

    private val processor: EmitterProcessor<Message<String>> = EmitterProcessor.create<Message<String>>()

    @RequestMapping(path = ["/api/dest/{destName}"], method = [GET], consumes = ["*/*"])
    @ResponseStatus(HttpStatus.ACCEPTED)
    fun handleRequest(@PathVariable destName:String) {
        val message: Message<String> = MessageBuilder.withPayload("body")
                .setHeader("spring.cloud.stream.sendto.destination", destName).build()
        processor.onNext(message)
    }

    @Bean
    fun supplier(): Supplier<Flux<Message<String>>> {
        return Supplier { processor }
    }
}

const val destResourceUrl = "http://localhost:8080/api/dest"
@Component
class TestConsumer() {

    private val restTemplate: RestTemplate = RestTemplate()
    private val logger: Log = LogFactory.getLog(javaClass)

    @Bean
    fun consume1(): Consumer<Message<String>> = Consumer {
        logger.info("==============>consume1 messge [[payload=${it.payload}, headers=${it.headers}]]")
        restTemplate.getForEntity("$destResourceUrl/customer-1", String::class.java)
    }

    @Bean
    fun consume2(): Consumer<Message<String>> = Consumer {
        logger.info("==============>consume2 messge [[payload=${it.payload}, headers=${it.headers}]]")
        restTemplate.getForEntity("$destResourceUrl/customer-2", String::class.java)
    }
}


@Component
class TestSink {
    private val logger: Log = LogFactory.getLog(javaClass)
    @Bean
    fun receive1(): Consumer<String> = Consumer {
        logger.info("Data received customer-1..." + it);
    }

    @Bean
    fun receive2(): Consumer<String> = Consumer {
        logger.info("Data received customer-2..." + it);
    }
}

Любая идея как исправить маршрут до потребителя?

заранее спасибо.

демо-репо

Ответы [ 2 ]

2 голосов
/ 27 февраля 2020

На самом деле я немного запутался, поэтому давайте сделаем один шаг за раз. Вот работающее (смоделированное по вашему) приложение, которое использует функцию sendto , позволяющую отправлять сообщения по указанным c (существующим и / или динамически разрешаемым) адресатам.

(в java но вы можете переделать его в Kotlin)

@Controller
public class WebSourceApplication {

    public static void main(String[] args) {
        SpringApplication.run(WebSourceApplication.class,
                "--spring.cloud.function.definition=supplier;consA;consB",
                "--spring.cloud.stream.bindings.consA-in-0.destination=consumerA",
                "--spring.cloud.stream.bindings.consA-in-0.group=consumerA-grp",
                "--spring.cloud.stream.bindings.consB-in-0.destination=consumerB",
                "--spring.cloud.stream.bindings.consB-in-0.group=consumerB-grp"
                );
    }

    EmitterProcessor<Message<String>> processor = EmitterProcessor.create();

    @RequestMapping(path = "/api/dest/{destName}", consumes = "*/*")
    @ResponseStatus(HttpStatus.ACCEPTED)
    public void delegateToSupplier(@RequestBody String body, @PathVariable String destName) {
        Message<String>  message = MessageBuilder.withPayload(body)
            .setHeader("spring.cloud.stream.sendto.destination", destName)
            .build();
        processor.onNext(message);
    }

    @Bean
    public Supplier<Flux<Message<String>>> supplier() {
        return () -> processor;
    }

    @Bean
    public Consumer<String> consA() {
        return v -> {
            System.out.println("Consuming from consA:  " + v);
        };
    }

    @Bean
    public Consumer<String> consB() {
        return v -> {
            System.out.println("Consuming from consB:  " + v);
        };
    }
}

И когда я сверну его, я получу последовательный вызов для соответствующего потребителя:

curl -H "Content-Type: application/json" -X POST -d "Hello Spring Cloud Stream" http://localhost:8080/api/dest/consumerA
log: Consuming from consA:  Hello Spring Cloud Stream
. . .

curl -H "Content-Type: application/json" -X POST -d "Hello Spring Cloud Stream" http://localhost:8080/api/dest/consumerB
log: Consuming from consB:  Hello Spring Cloud Stream

Примечание: нет включения свойство маршрутизации. Эта функция в основном предназначена для того, чтобы всегда вызывать одну функцию functionRouter и вызывать другие функции от вашего имени. Это функция функции spring-cloud, которая означает, что она работает за пределами Spring-cloud-srteam и каналов / направлений и т. Д. c.

Разве это не то, что вы пытаетесь выполнить sh? Отправить сообщение в другое место назначения на основе какой-либо переменной присяги в вашем HTTP-запросе?

1 голос
/ 27 февраля 2020

Вот пример другого микросервиса, который получает при маршрутизации функцию, которая переходит к различным функциям

public class FunctionRoutingApplication {

    public static void main(String[] args) {
        SpringApplication.run(FunctionRoutingApplication.class,
                "--spring.cloud.stream.function.routing.enabled=true"
                );
    }

    @Bean
    public Consumer<String> consA() {
        return v -> {
            System.out.println("Consuming from consA:  " + v);
        };
    }

    @Bean
    public Consumer<String> consB() {
        return v -> {
            System.out.println("Consuming from consB:  " + v);
        };
    }
}

И это в значительной степени так. Go вашему брокеру и отправьте данные на functionRouter-in-0 exchange, указав spring.cloud.function.definition=consA/consB заголовки, и вы увидите последовательные вызовы.

Я все еще что-то упускаю?

...