Как правильно использовать группы потребителей в весеннем облачном потоке данных и rabbitmq? - PullRequest
0 голосов
/ 15 января 2019

А последует этому:

один источник SCDF, 2 процессора, но только 1 обрабатывает каждый элемент

this is the graph i'm trying to implement

2 процессора (del-1 и del-2) на рисунке получают одинаковые данные в течение миллисекунд друг от друга. Я пытаюсь настроить это так, чтобы del-2 никогда не получал то же самое, что del-1, и наоборот. Очевидно, что я что-то настроил неправильно, но я не уверен, где.

Мой процессор имеет следующие свойства application.properties

spring.application.name=${vcap.application.name:sample-processor}
info.app.name=@project.artifactId@
info.app.description=@project.description@
info.app.version=@project.version@
management.endpoints.web.exposure.include=health,info,bindings
spring.autoconfigure.exclude=org.springframework.boot.autoconfigure.security.servlet.SecurityAutoConfiguration
spring.cloud.stream.bindings.input.group=input

Правильно ли указан "spring.cloud.stream.bindings.input.group"?

Вот код процессора:

@Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
public Object transform(String inputStr) throws InterruptedException{

    ApplicationLog log = new ApplicationLog(this, "timerMessageSource");

    String message = " I AM [" + inputStr + "] AND I HAVE BEEN PROCESSED!!!!!!!";

    log.info("SampleProcessor.transform() incoming inputStr="+inputStr);

    return message;
}

Является ли аннотация @Transformer правильным способом связать этот бит кода с "spring.cloud.stream.bindings.input.group" из application.properties? Есть ли другие аннотации, необходимые?

Вот мой источник:

private String format = "EEEEE dd MMMMM yyyy HH:mm:ss.SSSZ";
@Bean
@InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedDelay = "1000", maxMessagesPerPoll = "1"))
public MessageSource<String> timerMessageSource() {
    ApplicationLog log = new ApplicationLog(this, "timerMessageSource");
    String message = new SimpleDateFormat(format).format(new Date());
    log.info("SampleSource.timeMessageSource() message=["+message+"]");
    return () -> new GenericMessage<>(new SimpleDateFormat(format).format(new Date()));
}

Я запутался в "value = Source.OUTPUT". Означает ли это, что мой процессор должен называться по-другому?

Неужели включение @Poller вызывает у меня какие-то проблемы?

Вот как я определяю 2 потока процессора (del-1 и del-2) в оболочке SCDF:

stream create del-1 --definition ":split > processor-that-does-everything-sleeps5 --spring.cloud.stream.bindings.applicationMetrics.destination=metrics > :merge"

stream create del-2 --definition ":split > processor-that-does-everything-sleeps5 --spring.cloud.stream.bindings.applicationMetrics.destination=metrics > :merge"

Нужно ли там что-то делать по-другому?

Все это работает в Docker / K8s.

RabbitMQ задается как bitnami / rabbitmq: 3.7.2-r1 и конфигурируется со следующими реквизитами:

RABBITMQ_USERNAME: user
RABBITMQ_PASSWORD <redacted>:  
RABBITMQ_ERL_COOKIE <redacted>:  
RABBITMQ_NODE_PORT_NUMBER: 5672
RABBITMQ_NODE_TYPE: stats
RABBITMQ_NODE_NAME: rabbit@localhost
RABBITMQ_CLUSTER_NODE_NAME: 
RABBITMQ_DEFAULT_VHOST: /
RABBITMQ_MANAGER_PORT_NUMBER: 15672
RABBITMQ_DISK_FREE_LIMIT: "6GiB"

Нужны ли другие переменные среды?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...