Двунаправленная потоковая передача в Android, вызывающая OOM - PullRequest
0 голосов
/ 29 апреля 2019

Я настроил двунаправленную потоковую конструкцию в приложении для Android, где в настоящее время я использую механизм для отправки кусков больших файлов. Проблема, с которой я столкнулся, заключается в том, что мое приложение получит сообщение с запросом на файл, а затем я отвечу потенциально ответными сообщениями GRPC на сотни МБ, что часто приводит к OOM. Псевдокод:

public class Myclass implements StreamObserver<CameraRequest>, Closeable {
  ...

  public void onNext(Request req) {
    for (Chunk chunk : getChunks(req))
      this.requestObserver.onNext(builder.setChunk(chunk).build());
  }

  ...
}

Есть ли какой-нибудь хороший способ ограничить количество незавершенных вызовов onNext на основе того, что фактически было установлено на провод (и соответствующая память стала свободной)? IE разрешает только 10 вызовов onNext, затем последующие блокируются, пока данные для предыдущих вызовов не будут успешно отправлены базовым стеком протоколов? Я мог бы реализовать полное окно подтверждения e2e в своем проводном протоколе TCP, но надеялся, что есть более простая / встроенная техника, которую используют другие.

Спасибо!

Ответы [ 2 ]

0 голосов
/ 07 мая 2019

Вы можете проверить пример управления потоком здесь .

0 голосов
/ 06 мая 2019

В ролях requestObserver до ClientCallStreamObserver.Затем вы можете позвонить clientCallStreamObserver.isReady(), чтобы проверить, следует ли прекратить отправку.

Затем вам понадобятся уведомления о том, когда RPC будет готов для дополнительных сообщений, чтобы возобновить отправку.Для этого внедрите ClientResponseObserver и позвоните clientCallStreamObserver.setOnReadyHandler(Runnable) в пределах beforeStart().

Собрав все это вместе, вы получите нечто вроде:

public class MyClass implements
    ClientResponseObserver<CameraRequest,CameraResponse> {
  private ClientCallStreamObserver<CameraRequest> requestObserver;
  private Iterable<Chunk> chunks;

  public void beforeStart(ClientCallStreamObserver<CameraRequest> requestObserver) {
    this.requestObserver = requestObserver;
    requestObserver.setOnReadyHandler(MyClass::drain);
  }

  public void onNext(CameraRequest req) {
    // I don't know if this assert valid for your protocol
    assert chunks == null || !chunks.hasNext();
    chunks = getChunks(req);
    drain();
  }

  public void drain() {
    while (requestObserver.isReady() && chunks.hasNext()) {
      Chunk chunk = chunks.next();
      requestObserver.onNext(builder.setChunk(chunk).build());
    }
  }

  ...
}
...