Издатель реактивных потоков из тела ответа веб-клиента Vertx - PullRequest
0 голосов
/ 26 марта 2020

Я пытаюсь написать оболочку для Vertx веб-клиента для загрузки тела ответа с сервера, используя Publisher из реактивных потоков:

import org.reactivestreams.Publisher;
import io.vertx.reactivex.ext.web.client.WebClient;

interface Storage {
  Publisher<ByteBuffer> load(String key);
}

class WebStorage implements Storage {
  private final WebClient client;

  public WebStorage(final WebClient client) {
    this.client = client;
  }

  @Override
  public Publisher<ByteBuffer> laod(final String key) {
    return client.get(String.format("https://myhost/path?query=%s", key))
      .rxSend()
      .toFlowable()
      .map(resp -> ByteBuffer.wrap(resp.body().getBytes()));
  }
}

Это решение не является правильным, поскольку он читает все байты тела блокирующим способом с помощью вызова getBytes().

Возможно ли прочитать ответ из Vertx WebClient порциями и преобразовать его в Publisher (или Rx Flowable)?

Ответы [ 2 ]

1 голос
/ 26 марта 2020

Я думаю, вы можете использовать ByteCode c .pipe :

import io.reactivex.Flowable;
import io.vertx.ext.reactivestreams.ReactiveWriteStream;
import io.vertx.reactivex.core.Vertx;
import io.vertx.reactivex.core.buffer.Buffer;
import io.vertx.reactivex.core.streams.WriteStream;
import io.vertx.reactivex.ext.web.client.WebClient;
import io.vertx.reactivex.ext.web.codec.BodyCodec;
import org.reactivestreams.Publisher;

import java.nio.ByteBuffer;

interface Storage {
    Publisher<ByteBuffer> load(String key);
}

class WebStorage implements Storage {
    private final Vertx vertx = Vertx.vertx();
    private final WebClient client;

    public WebStorage(final WebClient client) {
        this.client = client;
    }

    @Override
    public Publisher<ByteBuffer> load(final String key) {
        final ReactiveWriteStream<Buffer> stream = ReactiveWriteStream.writeStream(vertx.getDelegate());
        client.get(String.format("https://myhost/path?query=%s", key))
            .as(BodyCodec.pipe(WriteStream.newInstance(stream)))
            .rxSend().subscribe();
        return Flowable.fromPublisher(stream).map(buffer -> ByteBuffer.wrap(buffer.getBytes()));
    }
}
1 голос
/ 26 марта 2020

Веб-клиент Vert.x не предназначен для потоковой передачи тела ответа. Он буферизует содержимое по своему дизайну.

Если вы хотите транслировать контент, вы можете использовать базовый HTTP-клиент, который является более гибким.

...