Проксирование больших файлов с использованием armeria или asynchttpclient - PullRequest
0 голосов
/ 30 января 2019

Итак, у меня есть прокси, созданный с помощью armeria, который должен начать загрузку из источника и одновременно передавать его наружу в пункт назначения.Файл должен быть потоковым, так как это прокси, и я не хочу хранить его и пересылать.Это приложение должно обрабатывать большой трафик.

Любые советы, где я должен искать делать это исключительно в Армерии?

Я попытался использовать asyncHttpClient, но я полагаю, что он блокирует основной поток или что-то еще, потому что запросы начинают накапливаться.

Весь код ниже находится внутри supplyAsync

Есть ли способя могу это исправить, или, что еще лучше, сделать это чисто в armeria?

public HttpResponse doLogic(URL getUrl, String requestId, String key, Lock lock, String token) {
    // ----
    PipedOutputStream pout = new PipedOutputStream();
    BodyDeferringAsyncHandler bodyDeferringAsyncHandler = new BodyDeferringAsyncHandler(pout);
    Future<Response> respFuture = asyncHttpClient.prepareGet(getUrl.toString()).execute(bodyDeferringAsyncHandler);

    CompletableFuture<HttpResponse> r = CompletableFuture.supplyAsync(() -> {
        try (PipedInputStream pin = new PipedInputStream(pout)) {
            Response resp = bodyDeferringAsyncHandler.getResponse();
            if (resp.getStatusCode() == 200) {
                try (InputStream is = new BodyDeferringAsyncHandler.BodyDeferringInputStream(respFuture, bodyDeferringAsyncHandler, pin)) {
                    HttpHeaders headers = translateHeaders(resp.getHeaders());
                    if (headers.get("etag").contains("-")) {
                        // We remove the etag, since it comes from a multipart upload, it isn't an md5 hash of the file.
                        headers.remove("etag");
                    }

                    PipedOutputStream pout2 = new PipedOutputStream();
                    BodyDeferringAsyncHandler bodyDeferringAsyncHandler2 = new BodyDeferringAsyncHandler(pout2);
                    Future<Response> putResp = asyncHttpClient.preparePut(Config.getString("swift.proxy.base.url") + "v1/" + key)
                            .setHeaders(headers)
                            .addHeader("x-auth-token", token)
                            .setBody(is).execute(bodyDeferringAsyncHandler2);

                    Response putR = null;
                    try {
                        putR = bodyDeferringAsyncHandler2.getResponse();
                        if (putR.getStatusCode() != 201 && putR.getStatusCode() != 202) {
                            log.error("[requestId: " + requestId + "] swift returned " + putR.getStatusCode());
                            return unlockingResponse(putR.getStatusCode(), lock, key);
                        }
                        log.info("[requestId: " + requestId + "] File: " + key + " rollbacked!");
                        return unlockingResponse(200, lock, key);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    return unlockingResponse(422, lock, key);
                }
            } else {
                log.warn("[requestId: " + requestId + "] s3 came back with status code: " + resp.getStatusCode());
                return unlockingResponse(200, lock, key);
            }
        } catch (InterruptedException | IOException e) {
            e.printStackTrace();
            return unlockingResponse(500, lock, key);
        }
    });

    return HttpResponse.from(r);
}

Я ожидаю, что он будет передавать потоковые файлы из источника в место назначения и сможет обрабатывать большой объем трафика.

...