Регулирование пропускной способности с помощью OMATPE - PullRequest
1 голос
/ 04 декабря 2011

Мой веб-сервер (специально созданный поверх Netty) использует веб-клиент (также специально созданный с помощью Netty) для отправки прокси-запросов к S3.

Client -> Webserver|Webclient -> S3

Цель системы - передать файлзагружает непосредственно в S3 с небольшим количеством логики:

  • Webserver принимает запрос клиента (POST);
  • Устанавливает Client читаемость канала в false и проверяет кучу вещей;
  • Когда все успешно проверено, он использует Webclient для подключения к S3;
  • Когда Webclient подключается к S3:
    1. itотправляет 100-Continue обратно клиенту
    2. устанавливает Client читаемость канала на true
  • С этого момента все чанки, полученные Webserver, передаютсяна Webclient для пересылки.

В (крайне маловероятном) случае соединение между Client и Webserver происходит быстрее, чем соединение между Webclient и S3,Мне нужно ограничить соединение между Client и Webserver.

Подход, который я выбралбыло просто хранить счетчик байтов, полученных Webserver (который увеличивается каждый раз, когда Client отправляет данные), и он уменьшается каждый раз, когда запись Webclient завершается.Всякий раз, когда объем данных в этом буфере превышает заданное пороговое значение, читаемость канала Client устанавливается на false.

Это прекрасно работает, пока я не добавлю OrderedMemoryAwareThreadPoolExecutor в конвейер сервера.

Простым решением является использование OioClientSocketChannelFactory на Webclient.Это приводит к тому, что вызовы к Channel.write блокируются, поэтому, когда messageReceived() вызывается на обработчике Webserver - и, следовательно, Channel.write вызывается на Webclient - регулирование происходит "естественно".

Однако, если я использую NioClientSocketChannelFactory на Webclient, то вызовы Channel.write становятся асинхронными, и регулирование перестает работать.

По сути, я замечаю, что Channel.setReadability(false)Кажется, ничего не дает, когда в конвейер вставляется OrderedMemoryAwareThreadPoolExecutor.

Как я могу выполнить регулирование с помощью OMATPE в конвейере?

Ответы [ 2 ]

2 голосов
/ 07 декабря 2011

Как сказал Джестан, пожалуйста, обратитесь к org.jboss.netty.handler.traffic.ChannelTrafficShapingHandler в мастере.Кроме того, вам придется настроить receiveBufferSizePredictorFactory так, чтобы он не возвращал слишком большое значение.В противном случае Netty просто выделит большой буфер и заполнит его очень быстро.Используйте AdaptiveReceiveBufferSizePredictorFactory с меньшим максимумом в сочетании с ChannelTrafficShapingHandler.

2 голосов
/ 04 декабря 2011

1) OrderedMemoryAwareThreadPoolExecutor также контролирует память канала (в вашем случае размер полученных данных) и приостанавливает / включает чтение, когда оно выше / ниже настроенного максимального размера (через конструктор OrderedMemoryAwareThreadPoolExecutor).

2) При использовании с ExecutionHandler обработчик может отбрасывать события состояния канала, если в контексте найдено какое-либо вложение (но это вложение контекста обычно устанавливается OrderedMemoryAwareThreadPoolExecutor, чтобы не позволять вышестоящим обработчикам восходящего потока изменять состояние канала и вызвать OutofMemoryException).

            boolean readSuspended = ctx.getAttachment() != null;
            if (readSuspended) {
                // Drop the request silently if MemoryAwareThreadPool has
                // set the flag.
                e.getFuture().setSuccess();
                return;
            }

Я думаю, вам нужно настроить минимальный, максимальный объем памяти канала OMATPE, или у вас может быть вложение контекста, приводящее к этой ситуации?

...