Хотя я ценю, что вы публикуете проект, к сожалению, ваша история продолжает меняться, и я все еще не уверен, чего вы хотите достичь sh. Итак, это мой последний ответ, но я постараюсь быть как можно более подробным и информативным, поэтому вот что я вижу из вашего проекта.
- Ваша конфигурация неверна. Свойство
definition
для функций должно spring.cloud.function.definition
. , .
spring:
cloud:
function:
definition: generate;process;sink
. , .
Поскольку вы используете ;
Я предполагаю, что вы хотите, чтобы все 3 функции были связаны независимо (без композиции функций), как описано в разделе множественное связывание .
spring.cloud.stream.function.bindings
- это свойство, которое позволяет сопоставить сгенерированное имя привязки с именем настраиваемой привязки, как описано в Имена привязок функций . Это не имеет ничего общего с названиями фактических мест назначения. Для этого у нас есть свойство destination
, которое также рассматривается в указанном разделе (например, --spring.cloud.stream.bindings.generate-out-0.destination = source1). Однако, если свойство destination
не используется, имя привязки и имя назначения предполагаются одинаковыми. Однако для получателя также требуется имя группы, и если оно не указано, оно генерирует его. Итак, в зависимости от вашей конфигурации ваш generate-out-0
поставщик обязан source1
exchange :
![enter image description here](https://i.stack.imgur.com/KdrXam.png)
Ввод функции process-in-in
с другой стороны связан с source1.anonymous...
очередью :
![enter image description here](https://i.stack.imgur.com/B5Yv7m.png)
И как Я уже говорил ранее, что не существует привязки RabbitMQ между source1
exchange и source1.anonymous...
queue , поэтому сообщения, отправляемые на source1
exchange , просто отбрасываются , При создании такой привязки (например, через консоль Rabbit MQ) сообщения доходят до потребителя.
При этом такой дизайн очень неэффективен. Почему вы хотите отправить и получить от того же получателя, находясь в том же пространстве процесса (JVM)? Зачем ругать сеть, когда можно просто перейти по ссылке? Поэтому, по крайней мере, изменив definition
на spring.cloud.function.definition = generate | process | sink`. Лучшим решением было бы просто написать свой код в самом поставщике
public void emitData(String str) {
String uppercased = str.toUpperCase();
sourceGenerator.onNext(uppercased);
System.out.println("Emitted: " + str);
}
и покончить с этим. В любом случае, я настоятельно рекомендую вам go ознакомиться с нашим руководством пользователя, в частности с разделом Основные понятия и с разделом Модель программирования , так как я считаю, что вы неправильно поняли некоторые основные понятия, которые, как я считаю, внести свой вклад в несоответствия как в вашем посте, так и в ваших вопросах.