Мой вариант использования - это вариант:
Создание потока с одним источником, двумя параллельными процессорами и одним приемником в Spring Cloud Data Flow
В этом примере 1 источник посылает элемент в rabbitmq, и оба процессора получают его.
Я хочу обратного. Я хочу, чтобы источник отправлял элементы в rabbitmq, но только 1 процессор обрабатывает каждый элемент.
Давайте представим, что у меня есть:
1 источник с именем источник
2 процессора с именами процессор1 и процессор2
Итак, источник излучает: A, B, C в rabbitmq
RabbitMQ будет излучать A
Какой бы процессор ни получил Первый обработает его - скажем, процессор1 является счастливчиком и обрабатывает A.
Тогда RabbitMQ будет излучать B
Поскольку процессор1 занят А, а процессор2 находится в режиме ожидания, процессор2 обрабатывает B
RabbitMQ будет испускать C
процессор1 закончил с A и находится в режиме ожидания, поэтому процессор1 обрабатывает C
График потока данных Spring Cloud, который я придумал:
процессор A - один сверху, процессор B - нижний
Когда я развертываю и запускаю его, источник излучает A, B и C, затем и процессор1, и процессор2 получают A, B и затем C
Я в замешательстве, если желаемое поведение может быть реализовано в Spring Cloud Data Flow ИЛИ если для этого есть настройка RabbitMQ, что подразумевается в ответе об удалении сообщения
"это то, что происходит, когда вы устанавливаете флаг автоматического подтверждения. Таким образом, сообщение подтверждается, как только оно израсходовано - так что оно ушло из очереди."
Если это так, могу ли я установить его в своем источнике потока данных Spring Cloud ИЛИ это настройка RabbitMQ или это что-то совсем другое
UPDATE:
Я добавил
spring.cloud.stream.bindings.input.group=consumerGroup
в файл application.properties моего процессора.
К сожалению, оба процессора получают одни и те же данные.
Нужно ли добавлять аналогичную запись в application.properties моего источника?
Нужно ли менять аннотацию на процессоре? В настоящее время это:
@Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
Нужно ли каким-либо образом изменять аннотацию на источнике? В настоящее время это:
@Bean
@InboundChannelAdapter(value = Source.OUTPUT, poller =
@Poller(fixedDelay = "1000", maxMessagesPerPoll = "1"))
Изменит ли это @Poller?
UPDATE:
Является ли свойство с именем spring.cloud.stream.instanceCount?