Как настроить Spring Integration для чтения из Redis с многопоточностью? - PullRequest
0 голосов
/ 30 мая 2019

Я настраиваю конфигурацию Spring Integration для чтения из redis с многопоточностью, но когда я запускаю свое приложение, Spring создает только один поток.

Я создаю int-redis: queue-inbound-channel-adapter с заданием executor с размером пула = 500 и емкостью очереди = 0.

<redis:queue-inbound-channel-adapter
            id="fromRedis" channel="privateAggregationExecutorChannel" queue="${instance}_private"
            receive-timeout="1000" recovery-interval="3000" expect-message="false" error-channel="distributionErrors"
            auto-startup="false" task-executor="robotTaskExecutor"/>

<task:executor
            id="robotTaskExecutor"
            pool-size="500"
            queue-capacity="0"
            keep-alive="50"
            rejection-policy="CALLER_RUNS" />

<int:service-activator input-channel="privateAggregationExecutorChannel" ref="aggregationExecutor" method="run" />

Я не знаю, что я делаю не так или есть что-то, чего мне не хватает. Я ценю вашу помощь.

1 Ответ

0 голосов
/ 30 мая 2019

Это верно. RedisQueueMessageDrivenEndpoint действительно однопоточный компонент:

@Override
protected void doStart() {
    if (!this.active) {
        this.active = true;
        this.restart();
    }
}

private void restart() {
    this.taskExecutor.execute(new ListenerTask());
}

Как вы видите, только один ListenerTask запланирован из адаптеров этого канала.

Чтобы сделать его многопоточным, лучше иметь ExecutorChannel для отправки сообщений от этого канального адаптера. Таким образом, даже если RedisQueueMessageDrivenEndpoint является однопоточным, вы все равно будете иметь многопоточную обработку.

Мы только что поняли, что введение параллелизма в этот компонент будет немного сложнее, когда мы можем обойти другой простой способ.

Другой подход заключается в том, чтобы иметь несколько <redis:queue-inbound-channel-adapter> определений для одного и того же канала queue и same для отправки.

...