один источник SCDF, 2 процессора, но только 1 обрабатывает каждый элемент - PullRequest
0 голосов
/ 15 января 2019

Мой вариант использования - это вариант:

Создание потока с одним источником, двумя параллельными процессорами и одним приемником в Spring Cloud Data Flow

В этом примере 1 источник посылает элемент в rabbitmq, и оба процессора получают его.

Я хочу обратного. Я хочу, чтобы источник отправлял элементы в rabbitmq, но только 1 процессор обрабатывает каждый элемент.

Давайте представим, что у меня есть:

1 источник с именем источник 2 процессора с именами процессор1 и процессор2

Итак, источник излучает: A, B, C в rabbitmq

RabbitMQ будет излучать A

Какой бы процессор ни получил Первый обработает его - скажем, процессор1 является счастливчиком и обрабатывает A.

Тогда RabbitMQ будет излучать B

Поскольку процессор1 занят А, а процессор2 находится в режиме ожидания, процессор2 обрабатывает B

RabbitMQ будет испускать C

процессор1 закончил с A и находится в режиме ожидания, поэтому процессор1 обрабатывает C

График потока данных Spring Cloud, который я придумал:

enter image description here

процессор A - один сверху, процессор B - нижний

Когда я развертываю и запускаю его, источник излучает A, B и C, затем и процессор1, и процессор2 получают A, B и затем C

Я в замешательстве, если желаемое поведение может быть реализовано в Spring Cloud Data Flow ИЛИ если для этого есть настройка RabbitMQ, что подразумевается в ответе об удалении сообщения

"это то, что происходит, когда вы устанавливаете флаг автоматического подтверждения. Таким образом, сообщение подтверждается, как только оно израсходовано - так что оно ушло из очереди."

Если это так, могу ли я установить его в своем источнике потока данных Spring Cloud ИЛИ это настройка RabbitMQ или это что-то совсем другое

UPDATE:

Я добавил

spring.cloud.stream.bindings.input.group=consumerGroup

в файл application.properties моего процессора.

К сожалению, оба процессора получают одни и те же данные.

Нужно ли добавлять аналогичную запись в application.properties моего источника?

Нужно ли менять аннотацию на процессоре? В настоящее время это:

@Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)

Нужно ли каким-либо образом изменять аннотацию на источнике? В настоящее время это:

@Bean
@InboundChannelAdapter(value = Source.OUTPUT, poller = 
     @Poller(fixedDelay = "1000", maxMessagesPerPoll = "1"))

Изменит ли это @Poller?

UPDATE:

Является ли свойство с именем spring.cloud.stream.instanceCount?

1 Ответ

0 голосов
/ 15 января 2019

Для потоковых приложений необходимо установить свойство ... consumer.group, чтобы они оба входили в одну группу и конкурировали за сообщения.

Но это должно произойти автоматически с SCDF.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...