Мне неизвестно, когда я задал этот вопрос - это основание уже наступило. Никита Скорняков (@rmcsoft на Github) реализовал именно эту вещь (SdkAsyncHttpClient
реализация, использующая HTTP-клиент Java 11 (java.net.http). Его можно найти здесь: https://github.com/rmcsoft/j11_aws_http_client (MIT лицензирован).
Для завершения здесь есть отдельная (которую вы, вероятно, никогда не должны использовать) Java-реализация:
package com.dow.as2;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import software.amazon.awssdk.http.AbortableInputStream;
import software.amazon.awssdk.http.Protocol;
import software.amazon.awssdk.http.SdkHttpConfigurationOption;
import software.amazon.awssdk.http.SdkHttpFullResponse;
import software.amazon.awssdk.http.SdkHttpRequest;
import software.amazon.awssdk.http.SdkHttpResponse;
import software.amazon.awssdk.http.async.AsyncExecuteRequest;
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
import software.amazon.awssdk.http.async.SdkAsyncHttpResponseHandler;
import software.amazon.awssdk.http.async.SdkHttpContentPublisher;
import software.amazon.awssdk.utils.AttributeMap;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicReference;
import static java.net.http.HttpClient.Version.HTTP_1_1;
import static java.net.http.HttpClient.Version.HTTP_2;
import static software.amazon.awssdk.http.Protocol.HTTP2;
import static software.amazon.awssdk.http.SdkHttpConfigurationOption.CONNECTION_TIMEOUT;
import static software.amazon.awssdk.http.SdkHttpConfigurationOption.PROTOCOL;
import static software.amazon.awssdk.http.SdkHttpConfigurationOption.READ_TIMEOUT;
public class JavaAsyncHttpClient implements SdkAsyncHttpClient {
private static final String CLIENT_NAME = "JavaNetAsyncHttpClient";
private final HttpClient httpClient;
private JavaAsyncHttpClient(AttributeMap options) {
this.httpClient = HttpClient.newBuilder()
.connectTimeout(options.get(CONNECTION_TIMEOUT))
.version(options.get(PROTOCOL) == HTTP2 ? HTTP_2 : HTTP_1_1)
.build();
}
public static Builder builder() {
return new DefaultBuilder();
}
/**
* Create a {@link HttpClient} client with the default properties
*
* @return a {@link JavaHttpClient}
*/
public static SdkAsyncHttpClient create() {
return new DefaultBuilder().build();
}
@Override
public CompletableFuture<Void> execute(AsyncExecuteRequest asyncExecuteRequest) {
SdkHttpRequest request = asyncExecuteRequest.request();
HttpRequest.Builder requestBuilder = HttpRequest.newBuilder().uri(request.getUri());
for (Map.Entry<String, List<String>> header : request.headers().entrySet()) {
// avoid java.lang.IllegalArgumentException: restricted header name: "Content-Length"
if (!header.getKey().equalsIgnoreCase("Content-Length") && !header.getKey().equalsIgnoreCase("Host")) {
for (String headerVal : header.getValue()) {
requestBuilder = requestBuilder.header(header.getKey(), headerVal);
}
}
}
switch (request.method()) {
case POST:
requestBuilder = requestBuilder.POST(new BodyPublisherProxy(asyncExecuteRequest.requestContentPublisher()));
break;
case PUT:
requestBuilder = requestBuilder.PUT(new BodyPublisherProxy(asyncExecuteRequest.requestContentPublisher()));
break;
case DELETE:
requestBuilder = requestBuilder.DELETE();
break;
case HEAD:
requestBuilder = requestBuilder.method("HEAD", HttpRequest.BodyPublishers.noBody());
break;
case PATCH:
throw new UnsupportedOperationException("PATCH not supported");
case OPTIONS:
requestBuilder = requestBuilder.method("OPTIONS", HttpRequest.BodyPublishers.noBody());
break;
}
// Need to use BodyHandlers.ofPublisher() or is that a dead end? How can link up the AWS Publisher/Subscribers
// with HttpClient sendAsync Flow.Publishers/Flow.Subscriber?
var responseHandler = asyncExecuteRequest.responseHandler();
var bodyHandler = new BodyHandlerProxy(asyncExecuteRequest.responseHandler());
return httpClient
.sendAsync(requestBuilder.build(), bodyHandler)
.thenApply(HttpResponse::body)
.thenApply(this::toAwsPublisher)
.thenAccept(responseHandler::onStream)
.exceptionally(t -> {
responseHandler.onError(t);
return null;
});
}
private Subscription toAwsSubscription(Flow.Subscription subscription) {
return new Subscription() {
@Override
public void request(long n) {
subscription.request(n);
}
@Override
public void cancel() {
subscription.cancel();
}
};
}
private Flow.Subscriber<? super ByteBuffer> toFlowSubscriber(Subscriber<? super ByteBuffer> subscriber) {
return new Flow.Subscriber<>() {
@Override
public void onSubscribe(Flow.Subscription subscription) {
subscriber.onSubscribe(toAwsSubscription(subscription));
}
@Override
public void onNext(ByteBuffer item) {
subscriber.onNext(item);
}
@Override
public void onError(Throwable throwable) {
subscriber.onError(throwable);
}
@Override
public void onComplete() {
subscriber.onComplete();
}
};
}
private Publisher<ByteBuffer> toAwsPublisher(Flow.Publisher<ByteBuffer> publisher) {
return new Publisher<>() {
@Override
public void subscribe(Subscriber<? super ByteBuffer> s) {
publisher.subscribe(toFlowSubscriber(s));
}
};
}
@Override
public void close() {
}
@Override
public String clientName() {
return CLIENT_NAME;
}
private static final class DefaultBuilder implements Builder {
private final AttributeMap.Builder standardOptions = AttributeMap.builder();
private DefaultBuilder() {
}
/**
* Sets the read timeout to a specified timeout. A timeout of zero is interpreted as an infinite timeout.
*
* @param socketTimeout the timeout as a {@link Duration}
* @return this object for method chaining
*/
public Builder socketTimeout(Duration socketTimeout) {
standardOptions.put(READ_TIMEOUT, socketTimeout);
return this;
}
public void setSocketTimeout(Duration socketTimeout) {
socketTimeout(socketTimeout);
}
/**
* Sets the connect timeout to a specified timeout. A timeout of zero is interpreted as an infinite timeout.
*
* @param connectionTimeout the timeout as a {@link Duration}
* @return this object for method chaining
*/
public Builder connectionTimeout(Duration connectionTimeout) {
standardOptions.put(CONNECTION_TIMEOUT, connectionTimeout);
return this;
}
public void setConnectionTimeout(Duration connectionTimeout) {
connectionTimeout(connectionTimeout);
}
public Builder protocol(Protocol protocol) {
standardOptions.put(PROTOCOL, protocol);
return this;
}
/**
* Used by the SDK to create a {@link SdkAsyncHttpClient} with service-default values if no other values have been configured
*
* @param serviceDefaults Service specific defaults. Keys will be one of the constants defined in
* {@link SdkHttpConfigurationOption}.
* @return an instance of {@link SdkAsyncHttpClient}
*/
@Override
public SdkAsyncHttpClient buildWithDefaults(AttributeMap serviceDefaults) {
return new JavaAsyncHttpClient(standardOptions.build()
.merge(serviceDefaults)
.merge(SdkHttpConfigurationOption.GLOBAL_HTTP_DEFAULTS));
}
}
private static class BodyHandlerProxy implements HttpResponse.BodyHandler<Flow.Publisher<ByteBuffer>> {
private final SdkAsyncHttpResponseHandler handler;
private BodyHandlerProxy(SdkAsyncHttpResponseHandler responseHandler) {
Objects.requireNonNull(responseHandler);
handler = responseHandler;
}
@Override
public HttpResponse.BodySubscriber<Flow.Publisher<ByteBuffer>> apply(HttpResponse.ResponseInfo responseInfo) {
handler.onHeaders(new SdkHttpHeadersProxy(responseInfo));
return new BodySubscriberProxy();
}
}
static final class SubscriberRef {
Flow.Subscriber<? super ByteBuffer> ref;
SubscriberRef(Flow.Subscriber<? super ByteBuffer> subscriber) {
ref = subscriber;
}
Flow.Subscriber<? super ByteBuffer> get() {
return ref;
}
Flow.Subscriber<? super ByteBuffer> clear() {
Flow.Subscriber<? super ByteBuffer> res = ref;
ref = null;
return res;
}
}
static final class SubscriptionRef implements Flow.Subscription {
final Flow.Subscription subscription;
final SubscriberRef subscriberRef;
SubscriptionRef(Flow.Subscription subscription,
SubscriberRef subscriberRef) {
this.subscription = subscription;
this.subscriberRef = subscriberRef;
}
@Override
public void request(long n) {
if (subscriberRef.get() != null) {
subscription.request(n);
}
}
@Override
public void cancel() {
subscription.cancel();
subscriberRef.clear();
}
void subscribe() {
Flow.Subscriber<?> subscriber = subscriberRef.get();
if (subscriber != null) {
subscriber.onSubscribe(this);
}
}
@Override
public String toString() {
return String
.format("SubscriptionRef/%s@%s", subscription.getClass().getName(), System.identityHashCode(subscription));
}
}
// Adapted from jdk.internal.net.http.ResponseSubscribers.PublishingBodySubscriber
private static class BodySubscriberProxy implements HttpResponse.BodySubscriber<Flow.Publisher<ByteBuffer>> {
private final CompletableFuture<Flow.Subscription>
subscriptionCF = new CompletableFuture<>();
private final CompletableFuture<SubscriberRef>
subscribedCF = new CompletableFuture<>();
private AtomicReference<SubscriberRef>
subscriberRef = new AtomicReference<>();
private final CompletableFuture<Flow.Publisher<ByteBuffer>> body =
subscriptionCF.thenCompose(
(s) -> CompletableFuture.completedFuture(this::subscribe));
private final CompletableFuture<Void> completionCF;
BodySubscriberProxy() {
completionCF = new CompletableFuture<>();
completionCF.whenComplete(
(r, t) -> subscribedCF.thenAccept(s -> complete(s, t)));
}
public CompletionStage<Flow.Publisher<ByteBuffer>> getBody() {
return body;
}
// This is a callback for the subscribedCF.
// Do not call directly!
private void complete(SubscriberRef ref, Throwable t) {
Flow.Subscriber<?> s = ref.clear();
// maybe null if subscription was cancelled
if (s == null) {
return;
}
if (t != null) {
s.onError(t);
return;
}
try {
s.onComplete();
} catch (Throwable x) {
s.onError(x);
}
}
private void signalError(Throwable err) {
completionCF.completeExceptionally(err != null ? err : new IllegalArgumentException("null throwable"));
}
private void signalComplete() {
completionCF.complete(null);
}
private void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
if (subscriber == null) {
throw new IllegalArgumentException("subscriber must not be null");
}
SubscriberRef ref = new SubscriberRef(subscriber);
if (subscriberRef.compareAndSet(null, ref)) {
subscriptionCF.thenAccept((s) -> {
SubscriptionRef subscription = new SubscriptionRef(s, ref);
try {
subscription.subscribe();
subscribedCF.complete(ref);
} catch (Throwable t) {
subscription.cancel();
}
});
} else {
subscriber.onSubscribe(new Flow.Subscription() {
@Override
public void request(long n) {
}
@Override
public void cancel() {
}
});
subscriber.onError(new IllegalStateException("This publisher has already one subscriber"));
}
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
subscriptionCF.complete(subscription);
}
@Override
public void onNext(List<ByteBuffer> item) {
try {
SubscriberRef ref = subscriberRef.get();
Flow.Subscriber<? super ByteBuffer> subscriber = ref.get();
if (subscriber != null) { // may be null if subscription was cancelled.
item.forEach(subscriber::onNext);
}
} catch (Throwable err) {
signalError(err);
subscriptionCF.thenAccept(Flow.Subscription::cancel);
}
}
@Override
public void onError(Throwable throwable) {
// onError can be called before request(1), and therefore can
// be called before subscriberRef is set.
signalError(throwable);
}
@Override
public void onComplete() {
// cannot be called before onSubscribe()
if (!subscriptionCF.isDone()) {
signalError(new InternalError("onComplete called before onSubscribed"));
} else {
// onComplete can be called before request(1),
// and therefore can be called before subscriberRef
// is set.
signalComplete();
}
}
}
private static class SdkHttpHeadersProxy implements SdkHttpFullResponse {
private final HttpResponse.ResponseInfo responseInfo;
private SdkHttpHeadersProxy(HttpResponse.ResponseInfo responseInfo) {
Objects.requireNonNull(responseInfo);
this.responseInfo = responseInfo;
}
@Override
public Optional<String> statusText() {
return Optional.empty();
}
@Override
public int statusCode() {
return responseInfo.statusCode();
}
@Override
public Map<String, List<String>> headers() {
return responseInfo.headers().map();
}
@Override
public Builder toBuilder() {
return SdkHttpResponse
.builder()
.headers(headers())
.statusCode(statusCode());
}
@Override
public Optional<AbortableInputStream> content() {
return Optional.empty(); // will be available at later stage
}
}
private class BodyPublisherProxy implements HttpRequest.BodyPublisher {
private final SdkHttpContentPublisher publisher;
private BodyPublisherProxy(SdkHttpContentPublisher publisher) {
Objects.requireNonNull(publisher);
this.publisher = publisher;
}
@Override
public long contentLength() {
return publisher.contentLength().orElse(-1L);
}
@Override
public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
publisher.subscribe(toAwsSubscriber(subscriber));
}
}
private Flow.Subscription toFlowSubscription(Subscription subscription) {
return new Flow.Subscription() {
@Override
public void request(long n) {
subscription.request(n);
}
@Override
public void cancel() {
subscription.cancel();
}
};
}
private Subscriber<? super ByteBuffer> toAwsSubscriber(Flow.Subscriber<? super ByteBuffer> subscriber) {
return new Subscriber<>() {
@Override
public void onSubscribe(Subscription s) {
subscriber.onSubscribe(toFlowSubscription(s));
}
@Override
public void onNext(ByteBuffer byteBuffer) {
subscriber.onNext(byteBuffer);
}
@Override
public void onError(Throwable t) {
subscriber.onError(t);
}
@Override
public void onComplete() {
subscriber.onComplete();
}
};
}
}
Я рекомендую использовать j11_aws_http_client
, связанный ранее с этим чудовищем (он обрабатывает тольконапример, часть ограниченных заголовков.) Приведенный выше код почти полностью скопирован и вставлен из этого проекта Github.
Реализация может быть значительно упрощена, если есть способ использовать java.net.http. BodySubscribeers.ofPublisher (который является Flow.Publisher<List<ByteBuffer>>>
).