Мой веб-сервер (специально созданный поверх Netty) использует веб-клиент (также специально созданный с помощью Netty) для отправки прокси-запросов к S3.
Client -> Webserver|Webclient -> S3
Цель системы - передать файлзагружает непосредственно в S3 с небольшим количеством логики:
Webserver
принимает запрос клиента (POST); - Устанавливает
Client
читаемость канала в false и проверяет кучу вещей; - Когда все успешно проверено, он использует
Webclient
для подключения к S3
; - Когда
Webclient
подключается к S3
: - itотправляет 100-Continue обратно клиенту
- устанавливает
Client
читаемость канала на true
- С этого момента все чанки, полученные
Webserver
, передаютсяна Webclient
для пересылки.
В (крайне маловероятном) случае соединение между Client
и Webserver
происходит быстрее, чем соединение между Webclient
и S3
,Мне нужно ограничить соединение между Client
и Webserver
.
Подход, который я выбралбыло просто хранить счетчик байтов, полученных Webserver
(который увеличивается каждый раз, когда Client
отправляет данные), и он уменьшается каждый раз, когда запись Webclient
завершается.Всякий раз, когда объем данных в этом буфере превышает заданное пороговое значение, читаемость канала Client
устанавливается на false
.
Это прекрасно работает, пока я не добавлю OrderedMemoryAwareThreadPoolExecutor
в конвейер сервера.
Простым решением является использование OioClientSocketChannelFactory
на Webclient
.Это приводит к тому, что вызовы к Channel.write
блокируются, поэтому, когда messageReceived()
вызывается на обработчике Webserver
- и, следовательно, Channel.write
вызывается на Webclient
- регулирование происходит "естественно".
Однако, если я использую NioClientSocketChannelFactory
на Webclient
, то вызовы Channel.write
становятся асинхронными, и регулирование перестает работать.
По сути, я замечаю, что Channel.setReadability(false)
Кажется, ничего не дает, когда в конвейер вставляется OrderedMemoryAwareThreadPoolExecutor
.
Как я могу выполнить регулирование с помощью OMATPE в конвейере?