Передача данных между двумя точками с использованием прерываний armeria / netty - PullRequest
0 голосов
/ 29 апреля 2019

У меня есть приложение, которое на основе полученного пути должно передавать файл между openstack-swift и s3.

В настоящее время я использую фреймворк Armeria, основанный на netty

Передача прерывается через некоторое время

Тайм-ауты максимальны для обоих клиентов, и точка, в которой они зависают, не обязательно всегда одинакова.

  @Override
    protected HttpResponse doPost(ServiceRequestContext context, HttpRequest request) {
        final CompletableFuture<HttpResponse> responseFuture = new CompletableFuture<>();

        CompletableFuture<HttpResponse> requestMessageFuture = request.aggregate().thenApplyAsync((AggregatedHttpMessage message) -> {
            // Get the object key from the message
            JSONObject o = new JSONObject(message.content().toStringUtf8());
            String key = o.getString("key");
            String token = request.headers().get(AsciiString.of("x-auth-token"));
            CompletableFuture<URL> signedUrlFuture = S3Utils.getInstance().getS3SignedUrl(com.amazonaws.HttpMethod.PUT, key);
            // Lock and check for lock status
            //Lock lock = lockKey(key);
            Lock lock = null;

            log.info("dbg: before get");

            HttpHeaders getHeaders = HttpHeaders.of(
                    AsciiString.of("x-auth-token"), token,
                    AsciiString.of(":path"), "/v1/" + key,
                    AsciiString.of(":method"), "GET"
            );

            log.info(token + "  /v1/" + key);

            signedUrlFuture.whenComplete((signedUrl, throwable) -> swiftClient.execute(HttpRequest.of(getHeaders))
                    .subscribe(new Subscriber<HttpObject>() {
                        private HttpRequestWriter writer;
                        private Subscription subscription;
                        private long size = 0;
                        private long tSize = 0;

                        @Override
                        public void onSubscribe(Subscription subscription) {
                            this.subscription = subscription;
                            subscription.request(1);
                        }

                        @Override
                        public final void onNext(HttpObject o) {
                            log.info("dbg: on next");
                            if (o instanceof HttpHeaders) {
                                log.info("dbg: on next headers");
                                onHeaders((HttpHeaders) o);
                                subscription.request(1);
                            } else if (o instanceof HttpData) {
                                log.info("dbg: on next other");
                                log.info("size: " + size);
                                log.info("tran: " + tSize);

                                writer.write(o);

                                // We increase the transferred data counter.
                                tSize += ((HttpData) o).length();

                                writer.onDemand(() -> {
                                    log.info("dbg: on next demand");
                                    subscription.request(1);
                                });
                            }
                        }

                        private void onHeaders(HttpHeaders headers) {
                            log.info("dbg: on headers");

                            if (headers.status().equals(HttpStatus.OK)) {

                                size = headers.getLong(AsciiString.of("Content-Length"));

                                headers.set(AsciiString.of(":path"), signedUrl.getPath());
                                headers.set(AsciiString.of(":method"), "PUT");

                                log.info("dbg: headers: " + headers);

                                writer = HttpRequest.streaming(translateHeaders(headers, key));

                                s3HttpClient.execute(writer).closeFuture();
                            } else {
                                log.error("key " + key + " error, status code: " + headers.status());
                                writer.close();
                                subscription.cancel();
                            }
                        }

                        @Override
                        public void onError(Throwable throwable) {
                            log.error("error migrating, " + throwable);
                            writer.close(throwable);
                            responseFuture.complete(unlockingResponse(500, lock, key));
                        }

                        @Override
                        public void onComplete() {
                            log.info("completed, " + key);
                            writer.close();
                            responseFuture.complete(unlockingResponse(200, lock, key));
                        }
                    }));
            return HttpResponse.from(responseFuture);
        });
        return HttpResponse.from(requestMessageFuture);
    }

похоже, что иногда он зависает, а иногда выдает ClosedPublisherException в этой строке:

writer.write(o);
...