весенний облачный поток rabbitmq потребительский динамический c масштабирование - PullRequest
0 голосов
/ 13 января 2020

Использование весеннего облачного потока с версией RabbitMQ. Я хочу добиться потребительского автоматического масштабирования, как объяснено здесь . с приведенной ниже конфигурацией у меня есть 2 одновременных потребителей. Но SimpleMessageListenerContainer не может динамически добавлять потребителей. Я проверил, что экземпляр SML C инициализируется как со свойствами параллелизма, так и со свойствами максимального параллелизма. Есть ли что-то, чего мне не хватает, чтобы получить динамическое c масштабирование?

Примечание: я опубликовал 200 сообщений в input1 exchange в rabbitmq.

pom . xml файл:

<project xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.test</groupId>
    <artifactId>consumerex</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>graphql</name>
    <url>http://maven.apache.org</url>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.2.RELEASE</version>
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.target>1.8</maven.compiler.target>
        <maven.compiler.source>1.8</maven.compiler.source>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
            <version>3.0.1.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>javax.servlet</groupId>
            <artifactId>javax.servlet-api</artifactId>
            <scope>provided</scope>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

файл application.yml:

spring:
  cloud:
    stream:
      bindings:
        input1:
          destination: input1
          binder: local_rabbit
          group: logMessageConsumers
          consumer:
            concurrency: 2
        input2:
          destination: input2
          binder: local_rabbit
          group: logMessageConsumers
      binders:
        local_rabbit:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
                virtual-host: /
      rabbit:
        bindings:
          input1:
            consumer:
              maxConcurrency: 10
server:
  port: 0
management:
  health:
    binders:
       enabled: true

Приложение. java файл:

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;

@SpringBootApplication
@EnableBinding(MyProcessor.class)
public class App 
{
    public static void main(String[] args) {
        SpringApplication.run(App.class, args);
    }

    @StreamListener("input1")
    public void enrichLogMessage(String log) {
        //code to keep current consumer busy. So auto scalling can happen.
        long time = System.currentTimeMillis() + 120000;
        while(time> System.currentTimeMillis()) {

        }
        System.out.println(Thread.currentThread().getName() + "    input1       "  + log);

    }

    @StreamListener("input2")
    public void input2(String log) {
        System.out.println(Thread.currentThread().getName() + "     input2         "  + log);
    }
}

MyProcessor. java файл:

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;
public interface MyProcessor {
    @Input("input1")
    SubscribableChannel input1();

    @Input("input2")
    SubscribableChannel input2();

}

1 Ответ

1 голос
/ 14 января 2020

Алгоритм увеличения параллелизма выполняется в потоке (ах) слушателя, поэтому спящий поток, подобный этому, будет препятствовать его работе.

Следующее приложение показывает, что оно работает как задумано ...

@SpringBootApplication
@EnableBinding(Sink.class)
public class So59723356Application {


    private static final Logger log = LoggerFactory.getLogger(So59723356Application.class);

    public static void main(String[] args) {
        SpringApplication.run(So59723356Application.class, args);
    }

    @StreamListener(Sink.INPUT)
    public void listen(String in) throws InterruptedException {
        log.info(in);
        Thread.sleep(1_000);
    }

    @Bean
    public ApplicationRunner runner(RabbitTemplate template) {
        return args -> IntStream.range(0, 50).forEach(i -> template.convertAndSend("input", "input", "foo"));
    }

}
spring.cloud.stream.bindings.input.group=group
spring.cloud.stream.bindings.input.consumer.concurrency=1
spring.cloud.stream.rabbit.bindings.input.consumer.max-concurrency=5

и

2020-01-14 14:20:51.849  INFO 71536 --- [  input.group-1] com.example.demo.So59723356Application   : foo
2020-01-14 14:20:51.852  INFO 71536 --- [           main] com.example.demo.So59723356Application   : Started So59723356Application in 1.729 seconds (JVM running for 2.123)
2020-01-14 14:20:52.855  INFO 71536 --- [  input.group-1] com.example.demo.So59723356Application   : foo
2020-01-14 14:20:53.862  INFO 71536 --- [  input.group-1] com.example.demo.So59723356Application   : foo
2020-01-14 14:20:54.869  INFO 71536 --- [  input.group-1] com.example.demo.So59723356Application   : foo
2020-01-14 14:20:55.874  INFO 71536 --- [  input.group-1] com.example.demo.So59723356Application   : foo
2020-01-14 14:20:56.882  INFO 71536 --- [  input.group-1] com.example.demo.So59723356Application   : foo
2020-01-14 14:20:57.885  INFO 71536 --- [  input.group-1] com.example.demo.So59723356Application   : foo
2020-01-14 14:20:58.889  INFO 71536 --- [  input.group-1] com.example.demo.So59723356Application   : foo
2020-01-14 14:20:59.894  INFO 71536 --- [  input.group-1] com.example.demo.So59723356Application   : foo
2020-01-14 14:21:00.901  INFO 71536 --- [  input.group-1] com.example.demo.So59723356Application   : foo
2020-01-14 14:21:01.906  INFO 71536 --- [  input.group-1] com.example.demo.So59723356Application   : foo
2020-01-14 14:21:02.911  INFO 71536 --- [  input.group-1] com.example.demo.So59723356Application   : foo
2020-01-14 14:21:03.917  INFO 71536 --- [  input.group-1] com.example.demo.So59723356Application   : foo
2020-01-14 14:21:03.917  INFO 71536 --- [  input.group-2] com.example.demo.So59723356Application   : foo
2020-01-14 14:21:04.922  INFO 71536 --- [  input.group-2] com.example.demo.So59723356Application   : foo
2020-01-14 14:21:04.922  INFO 71536 --- [  input.group-1] com.example.demo.So59723356Application   : foo
2020-01-14 14:21:05.924  INFO 71536 --- [  input.group-2] com.example.demo.So59723356Application   : foo
2020-01-14 14:21:05.924  INFO 71536 --- [  input.group-1] com.example.demo.So59723356Application   : foo
2020-01-14 14:21:06.930  INFO 71536 --- [  input.group-2] com.example.demo.So59723356Application   : foo
2020-01-14 14:21:06.930  INFO 71536 --- [  input.group-1] com.example.demo.So59723356Application   : foo

См. Начало нового потока в 14:21:03.917.

...