Реализация AWS SDK v2 SdkAsyncHttpClient с использованием Java 11 java.net.http HttpClient sendAsync - PullRequest
0 голосов
/ 15 октября 2019

Я пытаюсь реализовать SdkAsyncHttpClient , который использует Java 11 java.net.http.HttpClient (в частности, sendAsync). SdkAsyncHttpClient имеет один метод для реализации CompletableFuture<Void> execute(AsyncExecuteRequest asyncExecuteRequest). AsyncExecuteRequest предоставляет способ получить подробную информацию о HTTP-запросе и, что особенно важно, SdkHttpContentPublisher. Это входит в парадигму реактивной модели Publisher / Subscribe - которая, похоже, имеет встроенную поддержку HttpClient.sendAsync. Кажется, я близок к реализации, но (по крайней мере) отсутствует один важный шаг, поскольку я не могу добиться того, чтобы возвращенное будущее когда-либо было завершено.

Я думаю, что, возможно, я упускаю что-то фундаментальное длясвяжите их вместе прямым способом, но пока это ускользает от меня.

Вот моя попытка наивной (и очень простой) реализации:

import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import software.amazon.awssdk.http.Protocol;
import software.amazon.awssdk.http.SdkHttpConfigurationOption;
import software.amazon.awssdk.http.SdkHttpRequest;
import software.amazon.awssdk.http.async.AsyncExecuteRequest;
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
import software.amazon.awssdk.http.async.SdkHttpContentPublisher;
import software.amazon.awssdk.utils.AttributeMap;

import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Flow;

import static java.net.http.HttpClient.Version.HTTP_1_1;
import static java.net.http.HttpClient.Version.HTTP_2;
import static software.amazon.awssdk.http.Protocol.HTTP2;
import static software.amazon.awssdk.http.SdkHttpConfigurationOption.CONNECTION_TIMEOUT;
import static software.amazon.awssdk.http.SdkHttpConfigurationOption.PROTOCOL;
import static software.amazon.awssdk.http.SdkHttpConfigurationOption.READ_TIMEOUT;

public class JavaAsyncHttpClient implements SdkAsyncHttpClient {
private final HttpClient httpClient;

public JavaAsyncHttpClient(AttributeMap options) {
    this.httpClient = HttpClient.newBuilder()
            .connectTimeout(options.get(CONNECTION_TIMEOUT))
            .version(options.get(PROTOCOL) == HTTP2 ? HTTP_2 : HTTP_1_1)
            .build();
}

@Override
public CompletableFuture<Void> execute(AsyncExecuteRequest asyncExecuteRequest) {
    SdkHttpRequest request = asyncExecuteRequest.request();
    HttpRequest.Builder requestBuilder = HttpRequest.newBuilder().uri(request.getUri());
    for (Map.Entry<String, List<String>> header : request.headers().entrySet()) {
        // avoid java.lang.IllegalArgumentException: restricted header name: "Content-Length"
        if (!header.getKey().equalsIgnoreCase("Content-Length") && !header.getKey().equalsIgnoreCase("Host")) {
            for (String headerVal : header.getValue()) {
                requestBuilder = requestBuilder.header(header.getKey(), headerVal);
            }
        }
    }

    switch (request.method()) {
        case POST:
            requestBuilder = requestBuilder.POST(HttpRequest.BodyPublishers.fromPublisher(
                    toFlowPublisher(asyncExecuteRequest.requestContentPublisher())));
            break;
        case PUT:
            requestBuilder = requestBuilder.PUT(HttpRequest.BodyPublishers.fromPublisher(
                    toFlowPublisher(asyncExecuteRequest.requestContentPublisher())));
            break;
        case DELETE:
            requestBuilder = requestBuilder.DELETE();
            break;
        case HEAD:
            requestBuilder = requestBuilder.method("HEAD", HttpRequest.BodyPublishers.noBody());
            break;
        case PATCH:
            throw new UnsupportedOperationException("PATCH not supported");
        case OPTIONS:
            requestBuilder = requestBuilder.method("OPTIONS", HttpRequest.BodyPublishers.noBody());
            break;
    }
    // Need to use BodyHandlers.ofPublisher() or is that a dead end? How can link up the AWS Publisher/Subscribers
        Subscriber<ByteBuffer> subscriber = new BaosSubscriber(new CompletableFuture<>());
        asyncExecuteRequest.requestContentPublisher().subscribe(subscriber);
        HttpRequest httpRequest = requestBuilder.build();
        return httpClient.sendAsync(httpRequest, HttpResponse.BodyHandlers.fromSubscriber(toFlowSubscriber(subscriber)))
                .thenApply(voidHttpResponse -> null);
}

private Flow.Subscriber<? super List<ByteBuffer>> toFlowSubscriber(Subscriber<ByteBuffer> subscriber) {
    return new Flow.Subscriber<>() {
        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            subscriber.onSubscribe(toAwsSubscription(subscription));
        }

        @Override
        public void onNext(List<ByteBuffer> item) {
            subscriber.onNext(item.get(0));
        }

        @Override
        public void onError(Throwable throwable) {
            subscriber.onError(throwable);
        }

        @Override
        public void onComplete() {
            subscriber.onComplete();
        }
    };
}

private Subscription toAwsSubscription(Flow.Subscription subscription) {
    return new Subscription() {
        @Override
        public void request(long n) {
            subscription.request(n);
        }

        @Override
        public void cancel() {
            subscription.cancel();
        }
    };
}

private Flow.Publisher<ByteBuffer> toFlowPublisher(SdkHttpContentPublisher requestContentPublisher) {
    return subscriber -> requestContentPublisher.subscribe(toAwsSubscriber(subscriber));
}

private Subscriber<? super ByteBuffer> toAwsSubscriber(Flow.Subscriber<? super ByteBuffer> subscriber) {
    return new Subscriber<>() {
        @Override
        public void onSubscribe(Subscription s) {
            subscriber.onSubscribe(toFlowSubscription(s));
        }

        @Override
        public void onNext(ByteBuffer byteBuffer) {
            subscriber.onNext(byteBuffer);
        }

        @Override
        public void onError(Throwable t) {
            subscriber.onError(t);
        }

        @Override
        public void onComplete() {
            subscriber.onComplete();
        }
    };
}

private Flow.Subscription toFlowSubscription(Subscription subscription) {
    return new Flow.Subscription() {
        @Override
        public void request(long n) {
            subscription.request(n);
        }

        @Override
        public void cancel() {
            subscription.cancel();
        }
    };
}

@Override
public void close() {}


private static class BaosSubscriber implements Subscriber<ByteBuffer> {
    private final ByteArrayOutputStream baos = new ByteArrayOutputStream();
    private final CompletableFuture<ByteArrayOutputStream> streamFuture;
    private Subscription subscription;

    private BaosSubscriber(CompletableFuture<ByteArrayOutputStream> streamFuture) {
        this.streamFuture = streamFuture;
    }

    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        subscription.request(Long.MAX_VALUE);
    }

    @Override
    public void onNext(ByteBuffer byteBuffer) {
        try {
            baos.write(BinaryUtils.copyBytesFrom(byteBuffer));
            this.subscription.request(Long.MAX_VALUE);
        } catch (IOException e) {
            // Should never happen
            streamFuture.completeExceptionally(e);
        }
    }

    @Override
    public void onError(Throwable t) {
        streamFuture.completeExceptionally(t);

    }

    @Override
    public void onComplete() {
        streamFuture.complete(baos);
    }
}

Чего мне не хватаетВот? Возвращение будущего, которое завершается с null, следует спецификации SdkAsyncHttpClient, поэтому ясно, что HTTP-ответ нужно каким-то образом отправить подписчику на стороне AWS - но я тут заблудился.

Редактировать: Просто нашел это через Google: https://github.com/rmcsoft/j11_aws_http_client/blob/63f05326990317c59f1863be55942054769b437e/src/main/java/com/rmcsoft/aws/http/proxy/BodyHandlerProxy.java - посмотрим, находятся ли ответы внутри.

1 Ответ

0 голосов
/ 15 октября 2019

Мне неизвестно, когда я задал этот вопрос - это основание уже наступило. Никита Скорняков (@rmcsoft на Github) реализовал именно эту вещь (SdkAsyncHttpClient реализация, использующая HTTP-клиент Java 11 (java.net.http). Его можно найти здесь: https://github.com/rmcsoft/j11_aws_http_client (MIT лицензирован).

Для завершения здесь есть отдельная (которую вы, вероятно, никогда не должны использовать) Java-реализация:

package com.dow.as2;

import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import software.amazon.awssdk.http.AbortableInputStream;
import software.amazon.awssdk.http.Protocol;
import software.amazon.awssdk.http.SdkHttpConfigurationOption;
import software.amazon.awssdk.http.SdkHttpFullResponse;
import software.amazon.awssdk.http.SdkHttpRequest;
import software.amazon.awssdk.http.SdkHttpResponse;
import software.amazon.awssdk.http.async.AsyncExecuteRequest;
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
import software.amazon.awssdk.http.async.SdkAsyncHttpResponseHandler;
import software.amazon.awssdk.http.async.SdkHttpContentPublisher;
import software.amazon.awssdk.utils.AttributeMap;

import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicReference;

import static java.net.http.HttpClient.Version.HTTP_1_1;
import static java.net.http.HttpClient.Version.HTTP_2;
import static software.amazon.awssdk.http.Protocol.HTTP2;
import static software.amazon.awssdk.http.SdkHttpConfigurationOption.CONNECTION_TIMEOUT;
import static software.amazon.awssdk.http.SdkHttpConfigurationOption.PROTOCOL;
import static software.amazon.awssdk.http.SdkHttpConfigurationOption.READ_TIMEOUT;

public class JavaAsyncHttpClient implements SdkAsyncHttpClient {
    private static final String CLIENT_NAME = "JavaNetAsyncHttpClient";
    private final HttpClient httpClient;

    private JavaAsyncHttpClient(AttributeMap options) {
        this.httpClient = HttpClient.newBuilder()
                .connectTimeout(options.get(CONNECTION_TIMEOUT))
                .version(options.get(PROTOCOL) == HTTP2 ? HTTP_2 : HTTP_1_1)
                .build();
    }

    public static Builder builder() {
        return new DefaultBuilder();
    }

    /**
     * Create a {@link HttpClient} client with the default properties
     *
     * @return a {@link JavaHttpClient}
     */
    public static SdkAsyncHttpClient create() {
        return new DefaultBuilder().build();
    }

    @Override
    public CompletableFuture<Void> execute(AsyncExecuteRequest asyncExecuteRequest) {
        SdkHttpRequest request = asyncExecuteRequest.request();
        HttpRequest.Builder requestBuilder = HttpRequest.newBuilder().uri(request.getUri());
        for (Map.Entry<String, List<String>> header : request.headers().entrySet()) {
            // avoid java.lang.IllegalArgumentException: restricted header name: "Content-Length"
            if (!header.getKey().equalsIgnoreCase("Content-Length") && !header.getKey().equalsIgnoreCase("Host")) {
                for (String headerVal : header.getValue()) {
                    requestBuilder = requestBuilder.header(header.getKey(), headerVal);
                }
            }
        }

        switch (request.method()) {
            case POST:
                requestBuilder = requestBuilder.POST(new BodyPublisherProxy(asyncExecuteRequest.requestContentPublisher()));
                break;
            case PUT:
                requestBuilder = requestBuilder.PUT(new BodyPublisherProxy(asyncExecuteRequest.requestContentPublisher()));
                break;
            case DELETE:
                requestBuilder = requestBuilder.DELETE();
                break;
            case HEAD:
                requestBuilder = requestBuilder.method("HEAD", HttpRequest.BodyPublishers.noBody());
                break;
            case PATCH:
                throw new UnsupportedOperationException("PATCH not supported");
            case OPTIONS:
                requestBuilder = requestBuilder.method("OPTIONS", HttpRequest.BodyPublishers.noBody());
                break;
        }
        // Need to use BodyHandlers.ofPublisher() or is that a dead end? How can link up the AWS Publisher/Subscribers
        // with HttpClient sendAsync Flow.Publishers/Flow.Subscriber?

        var responseHandler = asyncExecuteRequest.responseHandler();
        var bodyHandler = new BodyHandlerProxy(asyncExecuteRequest.responseHandler());
        return httpClient
                .sendAsync(requestBuilder.build(), bodyHandler)
                .thenApply(HttpResponse::body)
                .thenApply(this::toAwsPublisher)
                .thenAccept(responseHandler::onStream)
                .exceptionally(t -> {
                    responseHandler.onError(t);
                    return null;
                });
    }

    private Subscription toAwsSubscription(Flow.Subscription subscription) {
        return new Subscription() {
            @Override
            public void request(long n) {
                subscription.request(n);
            }

            @Override
            public void cancel() {
                subscription.cancel();
            }
        };
    }

    private Flow.Subscriber<? super ByteBuffer> toFlowSubscriber(Subscriber<? super ByteBuffer> subscriber) {
        return new Flow.Subscriber<>() {
            @Override
            public void onSubscribe(Flow.Subscription subscription) {
                subscriber.onSubscribe(toAwsSubscription(subscription));
            }

            @Override
            public void onNext(ByteBuffer item) {
                subscriber.onNext(item);
            }

            @Override
            public void onError(Throwable throwable) {
                subscriber.onError(throwable);
            }

            @Override
            public void onComplete() {
                subscriber.onComplete();
            }
        };
    }

    private Publisher<ByteBuffer> toAwsPublisher(Flow.Publisher<ByteBuffer> publisher) {
        return new Publisher<>() {
            @Override
            public void subscribe(Subscriber<? super ByteBuffer> s) {
                publisher.subscribe(toFlowSubscriber(s));
            }
        };
    }

    @Override
    public void close() {
    }

    @Override
    public String clientName() {
        return CLIENT_NAME;
    }
    private static final class DefaultBuilder implements Builder {
        private final AttributeMap.Builder standardOptions = AttributeMap.builder();

        private DefaultBuilder() {
        }

        /**
         * Sets the read timeout to a specified timeout. A timeout of zero is interpreted as an infinite timeout.
         *
         * @param socketTimeout the timeout as a {@link Duration}
         * @return this object for method chaining
         */
        public Builder socketTimeout(Duration socketTimeout) {
            standardOptions.put(READ_TIMEOUT, socketTimeout);
            return this;
        }

        public void setSocketTimeout(Duration socketTimeout) {
            socketTimeout(socketTimeout);
        }

        /**
         * Sets the connect timeout to a specified timeout. A timeout of zero is interpreted as an infinite timeout.
         *
         * @param connectionTimeout the timeout as a {@link Duration}
         * @return this object for method chaining
         */
        public Builder connectionTimeout(Duration connectionTimeout) {
            standardOptions.put(CONNECTION_TIMEOUT, connectionTimeout);
            return this;
        }

        public void setConnectionTimeout(Duration connectionTimeout) {
            connectionTimeout(connectionTimeout);
        }

        public Builder protocol(Protocol protocol) {
            standardOptions.put(PROTOCOL, protocol);
            return this;
        }

        /**
         * Used by the SDK to create a {@link SdkAsyncHttpClient} with service-default values if no other values have been configured
         *
         * @param serviceDefaults Service specific defaults. Keys will be one of the constants defined in
         *                        {@link SdkHttpConfigurationOption}.
         * @return an instance of {@link SdkAsyncHttpClient}
         */
        @Override
        public SdkAsyncHttpClient buildWithDefaults(AttributeMap serviceDefaults) {
            return new JavaAsyncHttpClient(standardOptions.build()
                    .merge(serviceDefaults)
                    .merge(SdkHttpConfigurationOption.GLOBAL_HTTP_DEFAULTS));
        }
    }

    private static class BodyHandlerProxy implements HttpResponse.BodyHandler<Flow.Publisher<ByteBuffer>> {

        private final SdkAsyncHttpResponseHandler handler;

        private BodyHandlerProxy(SdkAsyncHttpResponseHandler responseHandler) {
            Objects.requireNonNull(responseHandler);
            handler = responseHandler;
        }

        @Override
        public HttpResponse.BodySubscriber<Flow.Publisher<ByteBuffer>> apply(HttpResponse.ResponseInfo responseInfo) {
            handler.onHeaders(new SdkHttpHeadersProxy(responseInfo));
            return new BodySubscriberProxy();
        }
    }

    static final class SubscriberRef {

        Flow.Subscriber<? super ByteBuffer> ref;

        SubscriberRef(Flow.Subscriber<? super ByteBuffer> subscriber) {
            ref = subscriber;
        }

        Flow.Subscriber<? super ByteBuffer> get() {
            return ref;
        }

        Flow.Subscriber<? super ByteBuffer> clear() {
            Flow.Subscriber<? super ByteBuffer> res = ref;
            ref = null;
            return res;
        }
    }

    static final class SubscriptionRef implements Flow.Subscription {

        final Flow.Subscription subscription;
        final SubscriberRef subscriberRef;

        SubscriptionRef(Flow.Subscription subscription,
                        SubscriberRef subscriberRef) {
            this.subscription = subscription;
            this.subscriberRef = subscriberRef;
        }

        @Override
        public void request(long n) {
            if (subscriberRef.get() != null) {
                subscription.request(n);
            }
        }

        @Override
        public void cancel() {
            subscription.cancel();
            subscriberRef.clear();
        }

        void subscribe() {
            Flow.Subscriber<?> subscriber = subscriberRef.get();
            if (subscriber != null) {
                subscriber.onSubscribe(this);
            }
        }

        @Override
        public String toString() {
            return String
                    .format("SubscriptionRef/%s@%s", subscription.getClass().getName(), System.identityHashCode(subscription));
        }
    }

    // Adapted from jdk.internal.net.http.ResponseSubscribers.PublishingBodySubscriber
    private static class BodySubscriberProxy implements HttpResponse.BodySubscriber<Flow.Publisher<ByteBuffer>> {

        private final CompletableFuture<Flow.Subscription>
                subscriptionCF = new CompletableFuture<>();
        private final CompletableFuture<SubscriberRef>
                subscribedCF = new CompletableFuture<>();
        private AtomicReference<SubscriberRef>
                subscriberRef = new AtomicReference<>();
        private final CompletableFuture<Flow.Publisher<ByteBuffer>> body =
                subscriptionCF.thenCompose(
                        (s) -> CompletableFuture.completedFuture(this::subscribe));

        private final CompletableFuture<Void> completionCF;

        BodySubscriberProxy() {
            completionCF = new CompletableFuture<>();
            completionCF.whenComplete(
                    (r, t) -> subscribedCF.thenAccept(s -> complete(s, t)));
        }

        public CompletionStage<Flow.Publisher<ByteBuffer>> getBody() {
            return body;
        }


        // This is a callback for the subscribedCF.
        // Do not call directly!
        private void complete(SubscriberRef ref, Throwable t) {
            Flow.Subscriber<?> s = ref.clear();
            // maybe null if subscription was cancelled
            if (s == null) {
                return;
            }
            if (t != null) {
                s.onError(t);
                return;
            }

            try {
                s.onComplete();
            } catch (Throwable x) {
                s.onError(x);
            }
        }

        private void signalError(Throwable err) {
            completionCF.completeExceptionally(err != null ? err : new IllegalArgumentException("null throwable"));
        }

        private void signalComplete() {
            completionCF.complete(null);
        }

        private void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
            if (subscriber == null) {
                throw new IllegalArgumentException("subscriber must not be null");
            }
            SubscriberRef ref = new SubscriberRef(subscriber);
            if (subscriberRef.compareAndSet(null, ref)) {
                subscriptionCF.thenAccept((s) -> {
                    SubscriptionRef subscription = new SubscriptionRef(s, ref);
                    try {
                        subscription.subscribe();
                        subscribedCF.complete(ref);
                    } catch (Throwable t) {
                        subscription.cancel();
                    }
                });
            } else {
                subscriber.onSubscribe(new Flow.Subscription() {
                    @Override
                    public void request(long n) {
                    }

                    @Override
                    public void cancel() {
                    }
                });
                subscriber.onError(new IllegalStateException("This publisher has already one subscriber"));
            }
        }

        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            subscriptionCF.complete(subscription);
        }

        @Override
        public void onNext(List<ByteBuffer> item) {
            try {
                SubscriberRef ref = subscriberRef.get();
                Flow.Subscriber<? super ByteBuffer> subscriber = ref.get();
                if (subscriber != null) { // may be null if subscription was cancelled.
                    item.forEach(subscriber::onNext);
                }
            } catch (Throwable err) {
                signalError(err);
                subscriptionCF.thenAccept(Flow.Subscription::cancel);
            }
        }

        @Override
        public void onError(Throwable throwable) {
            // onError can be called before request(1), and therefore can
            // be called before subscriberRef is set.
            signalError(throwable);
        }

        @Override
        public void onComplete() {
            // cannot be called before onSubscribe()
            if (!subscriptionCF.isDone()) {
                signalError(new InternalError("onComplete called before onSubscribed"));
            } else {
                // onComplete can be called before request(1),
                // and therefore can be called before subscriberRef
                // is set.
                signalComplete();
            }
        }
    }

    private static class SdkHttpHeadersProxy implements SdkHttpFullResponse {

        private final HttpResponse.ResponseInfo responseInfo;

        private SdkHttpHeadersProxy(HttpResponse.ResponseInfo responseInfo) {
            Objects.requireNonNull(responseInfo);
            this.responseInfo = responseInfo;
        }

        @Override
        public Optional<String> statusText() {
            return Optional.empty();
        }

        @Override
        public int statusCode() {
            return responseInfo.statusCode();
        }

        @Override
        public Map<String, List<String>> headers() {
            return responseInfo.headers().map();
        }

        @Override
        public Builder toBuilder() {
            return SdkHttpResponse
                    .builder()
                    .headers(headers())
                    .statusCode(statusCode());
        }

        @Override
        public Optional<AbortableInputStream> content() {
            return Optional.empty(); // will be available at later stage
        }
    }

    private class BodyPublisherProxy implements HttpRequest.BodyPublisher {
        private final SdkHttpContentPublisher publisher;

        private BodyPublisherProxy(SdkHttpContentPublisher publisher) {
            Objects.requireNonNull(publisher);
            this.publisher = publisher;
        }

        @Override
        public long contentLength() {
            return publisher.contentLength().orElse(-1L);
        }

        @Override
        public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
            publisher.subscribe(toAwsSubscriber(subscriber));
        }
    }

    private Flow.Subscription toFlowSubscription(Subscription subscription) {
        return new Flow.Subscription() {
            @Override
            public void request(long n) {
                subscription.request(n);
            }

            @Override
            public void cancel() {
                subscription.cancel();
            }
        };
    }

    private Subscriber<? super ByteBuffer> toAwsSubscriber(Flow.Subscriber<? super ByteBuffer> subscriber) {
        return new Subscriber<>() {
            @Override
            public void onSubscribe(Subscription s) {
                subscriber.onSubscribe(toFlowSubscription(s));
            }

            @Override
            public void onNext(ByteBuffer byteBuffer) {
                subscriber.onNext(byteBuffer);
            }

            @Override
            public void onError(Throwable t) {
                subscriber.onError(t);
            }

            @Override
            public void onComplete() {
                subscriber.onComplete();
            }
        };
    }
}

Я рекомендую использовать j11_aws_http_client, связанный ранее с этим чудовищем (он обрабатывает тольконапример, часть ограниченных заголовков.) Приведенный выше код почти полностью скопирован и вставлен из этого проекта Github.

Реализация может быть значительно упрощена, если есть способ использовать java.net.http. BodySubscribeers.ofPublisher (который является Flow.Publisher<List<ByteBuffer>>>).

...