Как правильно использовать ChunkedStream - PullRequest
3 голосов
/ 27 января 2012

Вот мой пример использования ... У меня есть восходящий сервис, который отправляет данные моего приложения Netty по сети, и эти данные необходимо опубликовать нескольким клиентам, подключенным к Netty. Данные, передаваемые клиентам, должны быть HTTP «Transfer-Encoding: chunked».

Я нашел ChunkedStream и, хотя, возможно, я мог бы создать PipedInputStream и PipedOutputStream (подключенные к PipedInputStream) и записать ChunkedStream в канал. Затем, когда данные будут получены от моего вышестоящего сервиса, я могу записать данные в PipedOutputStream каналов, и они будут отправлены клиентам:

В канал подключен

PipedInputStream in = new PipedInputStream();
PipedOutputStream out = new PipedOutputStream(in);
ctx.getChannel().write( new PersistentChunkedStream(in) );

Отдельный поток публикует данные для подключенных каналов

ChannelBuffer buff = ChannelBuffers.copiedBuffer("FOO",CharsetUtil.UTF_8);
out.write( buff.array() );
channel.get(ChunkedWriteHandler.class).resumeTransfer();

Мне пришлось расширить ChunkedStream, чтобы вернуть null из nextChunk, если доступно 0 байт (чтобы «приостановить» запись без зависания потока), поэтому я вызываю resumeTransfer после записи в PipedOutputStream соответствующего канала. Когда я отлаживаю и перебираю код, я вижу, как вызывается flush из ChunkedWriteHandler, что вызывает:

Channels.write(ctx, writeFuture, chunk, currentEvent.getRemoteAddress());

с байтами, которые я записал в PipedOutputStream,, но клиент так и не получил.

HTTP curl

~ $ curl -vN http://localhost:8080/stream
* About to connect() to localhost port 8080 (#0)
*   Trying 127.0.0.1... connected
* Connected to localhost (127.0.0.1) port 8080 (#0)
> GET /stream HTTP/1.1
> User-Agent: curl/7.19.7 (universal-apple-darwin10.0) libcurl/7.19.7 OpenSSL/0.9.8r zlib/1.2.3
> Host: localhost:8080
> Accept: */*
> 
< HTTP/1.1 200 OK
< Transfer-Encoding: chunked
< 
### NOTE: NO "FOO" TRANSMIT BACK ###

Есть мысли? Может быть, есть лучший способ сделать это?

Ответы [ 3 ]

4 голосов
/ 27 января 2012

Интересно, почему вы даже хотите использовать PipedInputStream / PipedOutputStream. Я думаю, что было бы проще / проще просто вызвать Channel.write (..) напрямую без ваших данных. Просто имейте в виду, что в Channel.write (..) необходимо передавать как можно больше данных, поскольку это дорогостоящая операция.

Вы можете вызывать Channel.write (..) из любого потока, который вам нужен, так как он безопасен для потоков.

3 голосов
/ 19 декабря 2012

Я знаю, что это старый вопрос, но, надеюсь, это кому-нибудь поможет.

ChunkedStream НЕ подразумевает HTTP Chunking ... это неудачное столкновение именования, насколько я могу судить.Чанкованные потоки предназначены только для того, чтобы избежать загрузки всего элемента в память, фактически ChunkedWriter после каждого чанка вызывает запрос в ChunkedStream для запроса дополнительных данных.

Как выясняется, вы МОЖЕТЕ использовать парадигму ChunkedStream для созданиято, что делает для вас HTTP-чанкинг из стандартного потока ввода.Код ниже реализует ChunkedInput и принимает InputStream.Он также автоматически добавляет завершающий http-блок для обозначения EOF, но делает это только один раз в соответствии со спецификацией ChunkedInput.

public class HttpChunkStream implements ChunkedInput {

private static final int CHUNK_SIZE = 8192;
boolean eof = false;

InputStream data;

HttpChunkStream (InputStream data) {
    this.data= data;
}

byte[] buf = new byte[CHUNK_SIZE];
@Override
public Object nextChunk() throws Exception {
    if (eof)
        return null;        
    int b = data.read(buf);
    if (b==-1) {
        eof=true;
        return new DefaultHttpChunk(ChannelBuffers.EMPTY_BUFFER);           
    }                   
    DefaultHttpChunk c = new DefaultHttpChunk(ChannelBuffers.wrappedBuffer(buf,0,b));       
    return c;
}

@Override
public boolean isEndOfInput() throws Exception {
    return eof;
}

@Override
public boolean hasNextChunk() throws Exception {
    return isEndOfInput()==false;
}

@Override
public void close() throws Exception {
    Closeables.closeQuietly(data);              
}

}

3 голосов
/ 31 января 2012

Просто добавьте еще немного содержимого к ответу, предоставленному Норманом.

При отправке произвольных фрагментированных данных сначала необходимо отправить новый DefaultHttpResponse (только один раз):

HttpResponse res = new DefaultHttpResponse();
res.setChunked(true);
res.setHeader(Names.TRANSFER_ENCODING, Values.CHUNKED);
channel.write(res);

Затем в любое время, когда вы хотите записать на канал с произвольным фрагментом, позвоните:

HttpChunk chunk = new DefaultHttpChunk(ChannelBuffers.wrappedBuffer(str.getBytes(CharsetUtil.UTF_8)));
channel.write(chunk);
...