как получить ответ byteBuf в samehandler, когда запрос записан в outboundChannel на прокси-сервере netty - PullRequest
3 голосов
/ 02 апреля 2019

Я использую прокси-сервер netty следующим образом: Http-запрос приходит,

  • если в локальном кеше есть данные, запишите на канал и сбросьте
  • Если нет, извлеките данные с удаленного сервера, добавьте их в кеш и сбросьте

Мне трудно извлечь byteBuf из ответа в том же обработчике, где я пишу клиенту.

В приведенном ниже примере, если вы увидите channelRead метод HexDumpProxyFrontendHandler, вы увидите, как я получаю данные из кэша и записи. Я добавил комментарии в этом методе ниже, где я сталкиваюсь с трудностями

Этот код работает от начала до конца. поэтому его можно скопировать и проверить локально.

Я вижу FullHttpResponse объект в HexDumpProxyBackendhandler#channelRead. но внутри этого метода у меня нет ни ссылки на кеш, ни идентификатор, который я хочу добавить в кеш.

Я думаю, что это можно решить двумя способами, но я не совсем понимаю, как это можно сделать.

1) либо получить ссылку на кэш и идентификатор в HexdumpProxyBackendHandler, тогда это станет легко. но hexDumpBackendhander создается в channelActive из HexDumpFrontendHandler, и в этот момент я не проанализировал свой входящий запрос

2) получить bytebuf ответа, извлеченный из HexdumpFrontendHandler#dchannelRead, в этом случае это просто вставка кэша.

HexDumpProxy.java

public final class HexDumpProxy {

static final int LOCAL_PORT = Integer.parseInt(System.getProperty("localPort", "8082"));
static final String REMOTE_HOST = System.getProperty("remoteHost", "api.icndb.com");
static final int REMOTE_PORT = Integer.parseInt(System.getProperty("remotePort", "80"));
static Map<Long,String> localCache = new HashMap<>();
public static void main(String[] args) throws Exception {
    System.err.println("Proxying *:" + LOCAL_PORT + " to " + REMOTE_HOST + ':' + REMOTE_PORT + " ...");
    localCache.put(123L, "profile1");
    localCache.put(234L, "profile2");
    // Configure the bootstrap.
    EventLoopGroup bossGroup = new NioEventLoopGroup(1);
    EventLoopGroup workerGroup = new NioEventLoopGroup();
    try {
        ServerBootstrap b = new ServerBootstrap();
        b.group(bossGroup, workerGroup)
         .channel(NioServerSocketChannel.class)
         .handler(new LoggingHandler(LogLevel.INFO))
         .childHandler(new HexDumpProxyInitializer(localCache, REMOTE_HOST, REMOTE_PORT))
         .childOption(ChannelOption.AUTO_READ, false)
         .bind(LOCAL_PORT).sync().channel().closeFuture().sync();
    } finally {
        bossGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();
    }
}

}

HexDumpProxyInitializer.java

public class HexDumpProxyInitializer extends ChannelInitializer<SocketChannel> {

private final String remoteHost;
private final int remotePort;
private Map<Long, String> cache;

public HexDumpProxyInitializer(Map<Long,String> cache, String remoteHost, int remotePort) {
    this.remoteHost = remoteHost;
    this.remotePort = remotePort;
    this.cache=cache;
}

@Override
public void initChannel(SocketChannel ch) {
    ch.pipeline().addLast(
            new LoggingHandler(LogLevel.INFO),
            new HttpServerCodec(),
            new HttpObjectAggregator(8*1024, true),
            new HexDumpProxyFrontendHandler(cache, remoteHost, remotePort));
}

}

HexDumpProxyFrontendHandler.java

 public class HexDumpProxyFrontendHandler extends ChannelInboundHandlerAdapter {
private final String remoteHost;
private final int remotePort;
private Channel outboundChannel;
private Map<Long, String> cache;

public HexDumpProxyFrontendHandler(Map<Long, String> cache, String remoteHost, int remotePort) {
    this.remoteHost = remoteHost;
    this.remotePort = remotePort;
    this.cache = cache;
}

@Override
public void channelActive(ChannelHandlerContext ctx) {
    final Channel inboundChannel = ctx.channel();

    // Start the connection attempt.
    Bootstrap b = new Bootstrap();
    b.group(inboundChannel.eventLoop())
     .channel(ctx.channel().getClass())
     .handler((new ChannelInitializer() {
         protected void initChannel(Channel ch) {
             ChannelPipeline var2 = ch.pipeline();
             var2.addLast((new HttpClientCodec()));
             var2.addLast(new HttpObjectAggregator(8192, true));
             var2.addLast(new HexDumpProxyBackendHandler(inboundChannel));
         }
     }))
     .option(ChannelOption.AUTO_READ, false);
    ChannelFuture f = b.connect(remoteHost, remotePort);
    outboundChannel = f.channel();
    f.addListener(new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) {
            if (future.isSuccess()) {
                // connection complete start to read first data
                inboundChannel.read();
            } else {
                // Close the connection if the connection attempt has failed.
                inboundChannel.close();
            }
        }
    });
}

@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) {
    if (msg instanceof HttpRequest) {
        System.out.println("msg is instanceof httpRequest");
        HttpRequest req = (HttpRequest)msg;
        QueryStringDecoder queryStringDecoder = new QueryStringDecoder(req.uri());
        String userId = queryStringDecoder.parameters().get("id").get(0);
        Long id = Long.valueOf(userId);
        if (cache.containsKey(id)){
            StringBuilder buf = new StringBuilder();
            buf.append(cache.get(id));
            writeResponse(req, ctx, buf);
            closeOnFlush(ctx.channel());
            return;
        }
    }
    if (outboundChannel.isActive()) {
        outboundChannel.writeAndFlush(msg).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) {
                if (future.isSuccess()) {
                    // was able to flush out data, start to read the next chunk
                    ctx.channel().read();
                } else {
                    future.channel().close();
                }
            }
        });
    }

    //get response back from HexDumpProxyBackendHander and write to cache
    //basically I need to do cache.put(id, parse(response));
    //how to get response buf from inboundChannel here is the question I am trying to solve
}

@Override
public void channelInactive(ChannelHandlerContext ctx) {
    if (outboundChannel != null) {
        closeOnFlush(outboundChannel);
    }

}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
    cause.printStackTrace();
    closeOnFlush(ctx.channel());
}

/**
 * Closes the specified channel after all queued write requests are flushed.
 */
static void closeOnFlush(Channel ch) {
    if (ch.isActive()) {
        ch.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
    }
}

//borrowed from HttpSnoopServerHandler.java in snoop example
private boolean writeResponse(HttpRequest request, ChannelHandlerContext ctx, StringBuilder buf) {
    // Decide whether to close the connection or not.
    boolean keepAlive = HttpUtil.isKeepAlive(request);
    // Build the response object.
    FullHttpResponse response = new DefaultFullHttpResponse(
            HTTP_1_1, request.decoderResult().isSuccess()? OK : BAD_REQUEST,
            Unpooled.copiedBuffer(buf.toString(), CharsetUtil.UTF_8));

    response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain; charset=UTF-8");

    if (keepAlive) {
        // Add 'Content-Length' header only for a keep-alive connection.
        response.headers().setInt(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
        // Add keep alive header as per:
        // - http://www.w3.org/Protocols/HTTP/1.1/draft-ietf-http-v11-spec-01.html#Connection
        response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
    }

    // Encode the cookie.
    String cookieString = request.headers().get(HttpHeaderNames.COOKIE);
    if (cookieString != null) {
        Set<Cookie> cookies = ServerCookieDecoder.STRICT.decode(cookieString);
        if (!cookies.isEmpty()) {
            // Reset the cookies if necessary.
            for (io.netty.handler.codec.http.cookie.Cookie cookie: cookies) {
                response.headers().add(HttpHeaderNames.SET_COOKIE, io.netty.handler.codec.http.cookie.ServerCookieEncoder.STRICT.encode(cookie));
            }
        }
    } else {
        // Browser sent no cookie.  Add some.
        response.headers().add(HttpHeaderNames.SET_COOKIE, io.netty.handler.codec.http.cookie.ServerCookieEncoder.STRICT.encode("key1", "value1"));
        response.headers().add(HttpHeaderNames.SET_COOKIE, ServerCookieEncoder.STRICT.encode("key2", "value2"));
    }

    // Write the response.
    ctx.write(response);

    return keepAlive;
}
* *} Тысяча сорок-девять

HexDumpProxyBackendHandler.java

public class HexDumpProxyBackendHandler extends ChannelInboundHandlerAdapter {

private final Channel inboundChannel;

public HexDumpProxyBackendHandler(Channel inboundChannel) {
    this.inboundChannel = inboundChannel;
}

@Override
public void channelActive(ChannelHandlerContext ctx) {
    ctx.read();
}

@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) {
    if (msg instanceof FullHttpResponse) {
        System.out.println("this is fullHttpResponse");
    }
    inboundChannel.writeAndFlush(msg).addListener(new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) {
            if (future.isSuccess()) {
                ctx.channel().read();
            } else {
                future.channel().close();
            }
        }
    });
}

@Override
public void channelInactive(ChannelHandlerContext ctx) {
    HexDumpProxyFrontendHandler.closeOnFlush(inboundChannel);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
    cause.printStackTrace();
    HexDumpProxyFrontendHandler.closeOnFlush(ctx.channel());
}

}

P.S .: Я взял большую часть кода из проекта netty-example и настроил его

EDIT

Согласно предложениям Ferrygig, я изменил FrontEndChannelHander # channelRead следующим образом. Я удалил channelActive и реализовал метод записи

@ Override public void channelRead (окончательный ChannelHandlerContext ctx, объект msg) {

if (msg instanceof HttpRequest) {
    System.out.println("msg is instanceof httpRequest");
    HttpRequest req = (HttpRequest)msg;
    QueryStringDecoder queryStringDecoder = new QueryStringDecoder(req.uri());
    String userId = queryStringDecoder.parameters().get("id").get(0);
    id = Long.valueOf(userId);
    if (cache.containsKey(id)){
        StringBuilder buf = new StringBuilder();
        buf.append(cache.get(id));
        writeResponse(req, ctx, buf);
        closeOnFlush(ctx.channel());
        return;
    }

    final Channel inboundChannel = ctx.channel();

    //copied from channelActive method

    // Start the connection attempt.
    Bootstrap b = new Bootstrap();
    b.group(inboundChannel.eventLoop())
            .channel(ctx.channel().getClass())
            .handler((new ChannelInitializer() {
                protected void initChannel(Channel ch) {
                    ChannelPipeline var2 = ch.pipeline();
                    var2.addLast((new HttpClientCodec()));
                    var2.addLast(new HttpObjectAggregator(8192, true));
                    var2.addLast(new HexDumpProxyBackendHandler(inboundChannel, cache));
                }
            }));
            //.option(ChannelOption.AUTO_READ, false);
    ChannelFuture f = b.connect(remoteHost, remotePort);
    outboundChannel = f.channel();
    f.addListener(new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) {
            if (future.isSuccess()) {
                // connection complete start to read first data
                inboundChannel.read();
            } else {
                // Close the connection if the connection attempt has failed.
                inboundChannel.close();
            }
        }
    });
}
if (outboundChannel.isActive()) {
    outboundChannel.writeAndFlush(msg).addListener(new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) {
            if (future.isSuccess()) {
                // was able to flush out data, start to read the next chunk
                ctx.channel().read();
            } else {
                future.channel().close();
            }
        }
    });
}

Ответы [ 2 ]

1 голос
/ 05 апреля 2019

Существует несколько способов решения этой проблемы, и путь к достижению вашей конечной цели различен.

В настоящее время вы используете топологию «1 входящее подключение - 1 исходящее подключение», это немного упрощает проектирование системы, поскольку вам не нужно беспокоиться о синхронизации нескольких запросов в одном исходящем потоке.

В данный момент ваш обработчик веб-интерфейса расширяет ChannelInboundHandlerAdapter, он только перехватывает «пакеты», поступающие в ваше приложение, если мы его расширяем ChannelDuplexHandler, мы также можем обрабатывать «пакеты», выходящие приложений.

Чтобы приблизиться к этому пути, нам нужно обновить класс HexDumpProxyFrontendHandler, чтобы расширить ChannelDuplexHandler (пока назовем его CDH).

Следующим шагом в этом процессе является переопределение метода write, поступающего из CDH , чтобы мы могли перехватывать, когда бэкэнд отправляет нам ответ обратно.

После того, как мы создали метод записи, нам нужно обновить нашу (не поточнобезопасную) карту, вызвав метод put.

public class HexDumpProxyFrontendHandler extends ChannelDuplexHandler {
    Long lastId;
    // ...
    @Override
    public void channelRead(final ChannelHandlerContext ctx, Object msg) {
        if (msg instanceof HttpRequest) {
            System.out.println("msg is instanceof httpRequest");
            HttpRequest req = (HttpRequest)msg;
            QueryStringDecoder queryStringDecoder = new QueryStringDecoder(req.uri());
            String userId = queryStringDecoder.parameters().get("id").get(0);
            Long id = Long.valueOf(userId);
            lastId = id; // Store ID of last request
            // ...
        }
        // ...
    }
    // ...
    public void write(
        ChannelHandlerContext ctx,
        java.lang.Object msg,
        ChannelPromise promise
    ) throws java.lang.Exception {

        if (msg instanceof FullHttpResponse) {
            System.out.println("this is fullHttpResponse");
            FullHttpResponse full = (FullHttpResponse)msg;
            cache.put(lastId, parse(full)); // TODO: Include a system here to convert the request to a string
        }
        super.write(ctx, msg, promise);
    }
    // ...
}

Мы еще не закончили, пока у нас есть код, нам все еще нужно исправить несколько ошибок в других местах кода.

Не поточно-ориентированная карта (критическая ошибка)

Одна из тех ошибок в том, что вы используете нормальную хэш-карту для обработки вашего кэша. Проблема в том, что это не безопасно для потоков, если к вашему приложению одновременно подключаются несколько человек, могут произойти странные вещи, включая полное повреждение карты при обновлении внутренней структуры карты.

Чтобы противостоять этой проблеме, мы собираемся «обновить» карту до ConcurrentHashMap, эта карта имеет специальные структуры для работы с несколькими потоками, запрашивающими и сохраняющими данные одновременно, без огромных потерь в производительности. (если производительность является основной проблемой, вы можете получить более высокую производительность, используя хэш-карту для каждого потока вместо глобального кэша, но это означает, что каждый ресурс может быть кэширован до количества потоков.

Нет правил удаления кэша (серьезная ошибка)

В настоящее время нет кода для удаления устаревших ресурсов, это означает, что кэш будет заполняться до тех пор, пока у программы не останется памяти, и затем произойдет впечатляющий крах.

Эту проблему можно решить либо с помощью реализации карты, которая обеспечивает как потокобезопасный доступ, так и так называемые правила удаления, либо с помощью уже готовых решений для кэширования, таких как Кэши Gnuava .

Неправильная обработка конвейерной передачи HTTP (незначительная ошибка)

Одной из менее известных функций HTTP является конвейерная обработка , это в основном означает, что клиент может отправить на сервер другой запрос, без , ожидая ответа на предыдущий запрос. К ошибкам этого типа относятся серверы, которые обменивают содержимое обоих запросов или даже полностью их искажают.

Несмотря на то, что конвейерные запросы в наши дни редки, все больше и больше поддерживается HTTP2, а также известно, что на нем есть неработающие серверы, это все же происходит с некоторыми инструментами CLI, которые его используют.

Чтобы решить эту проблему, ТОЛЬКО прочитайте запрос ПОСЛЕ того, как вы отправили предыдущий ответ, один из способов сделать это - сохранить список запросов или перейти к более продвинутым предварительным решениям

1 голос
/ 03 апреля 2019

гроза

Возможно, я ошибаюсь, когда я читаю эту часть вашего HexDumpProxyFrontendHandler, я чувствую, что, возможно, что-то неправильно (я поставил свои комментарии немного вперед в соответствии с правильным стилем, чтобы сделать их видимыми):

 // Not incorrect but better to have only one bootstrap and reusing it
    Bootstrap b = new Bootstrap(); 
    b.group(inboundChannel.eventLoop())
            .channel(ctx.channel().getClass())
            .handler(new HexDumpProxyBackendHandler(inboundChannel))
 // I know what AUTO_READ false is, but my question is why you need it?
            .option(ChannelOption.AUTO_READ, false);
    ChannelFuture f = b.connect(remoteHost, remotePort);
 // Strange to me to try to get the channel while you did not test yet it is linked
    outboundChannel = f.channel();
    f.addListener(new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) {
            if (future.isSuccess()) {
 // Maybe you should start to send there, therefore getting the outboundChannel right there?
 // add a log in order to see if you come there
 // probably you have to send first, before asking to read anything?
 // position (1)
                inboundChannel.read();
            } else {
                inboundChannel.close();
            }
        }
    });
 // I suggest to move this in position named (1)
    if (outboundChannel.isActive()) {
 // maybe a log to see if anything will be written?
        outboundChannel.writeAndFlush(msg).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) {
                if (future.isSuccess()) {
                    System.out.println("success!! - FrontEndHandler");
                    ctx.channel().read();
                } else {
                    future.channel().close();
                }
            }
        });
    }

Для меня, похоже, вы не дождались открытия канала. Вам не хватает какого-то журнала при отправке на провод, чтобы гарантировать, что вы действительно что-то отправляете (в журналах мы можем видеть только то, что соединение открыто, а затем закрыто в основном, между ними ничего нет).

Может быть, еще несколько журналов могут помочь нам и вам?

...