RxJava2 Параллельная загрузка - PullRequest
0 голосов
/ 01 июля 2018

Привет. Я пытаюсь загрузить файлы параллельно, используя rxjava2, но кое-как, как это не работает, мой код ниже:

private Flowable<BinaryDownlodable> downloadFile(List<BinaryDownlodable> binaryDownlodables) throws IOException {
    return Flowable.fromIterable(binaryDownlodables)
        .flatMap((BinaryDownlodable downlodable) -> {
          return Flowable.fromCallable(new Callable<BinaryDownlodable>() {

            @Override
            public BinaryDownlodable call() throws Exception {
              System.out.println("Starting: " + downlodable.remote());
              final Request request = new Request.Builder()
                  .cacheControl(CacheControl.FORCE_NETWORK)
                  .url(downlodable.remote())
                  .build();
              final Response response = okHttpClient.newCall(request).execute();
              final InputStream inputStream = response.body().byteStream();
              final File newFile = new File(downlodable.local());
              final byte[] buff = new byte[4096];
              long downloaded = 0;
              final long target = response.body().contentLength();
              final String totalSize = FileUtils.readableFileSize(target);
              try (OutputStream outStream = new FileOutputStream(newFile)) {
                while (true) {
                  int read = inputStream.read(buff);
                  if (read == -1) {
                    break;
                  }
                  outStream.write(buff, 0, read);
                  //write buff
                  downloaded += read;
                }
                updateDbItem(downlodable, downloaded, target);
              }
              return downlodable;
            }
          });
        }, 3);
  }

BinaryDownloadable.java

public final class BinaryDownlodable implements Downloable {



  private String urlLocal;
  private String urlRemote;
  private boolean completed;
  private Object item;

  public BinaryDownlodable(String urlLocal, String urlRemote) {
    this.urlLocal = urlLocal;
    this.urlRemote = urlRemote;
  }



  public Object getItem() {
    return item;
  }

  public void setItem(Object item) {
    this.item = item;
  }

  @Override
  public String local() {
    return urlLocal;
  }

  @Override
  public String remote() {
    return urlRemote;
  }

  @Override
  public void completed(boolean completed) {
    this.completed = completed;
  }

  public String getUrlLocal() {
    return urlLocal;
  }

  public String getUrlRemote() {
    return urlRemote;
  }

  public boolean isCompleted() {
    return completed;
  }



  @Override
  public String toString() {
    return "BinaryDownlodable{" +
        "urlLocal='" + urlLocal + '\'' +
        ", urlRemote='" + urlRemote + '\'' +
        '}';
  }
}

Вот как я звоню:

downloadFile(binaryDownlodables)
                  .subscribeOn(Schedulers.io())
                  .observeOn(AndroidSchedulers.mainThread())
                  .subscribe(new Consumer<BinaryDownlodable>() {
                    @Override
                    public void accept(BinaryDownlodable binaryDownlodable) throws Exception {
                      System.out.println("Accepted");
                    }
                  });

Ответы [ 2 ]

0 голосов
/ 01 июля 2018

Публикация этого из моего комментария, как он вам помог,

Попробуйте дать Flowable.fromCallable(...).subscribeOn(Schedulers.newThread()). Я думаю, что все ваши Flowables внутри flatMap последовательно выполняются в фоновом потоке ввода-вывода

0 голосов
/ 01 июля 2018

Извините за нетерпение, вот как это решается, я забыл добавить .subscribeOn(Schedulers.io())

private Flowable<BinaryDownlodable> downloadFile(List<BinaryDownlodable> binaryDownlodables) throws IOException {
    return Flowable.fromIterable(binaryDownlodables)
        .flatMap((BinaryDownlodable downlodable) -> {
          return Flowable.fromCallable(new Callable<BinaryDownlodable>() {

            @Override
            public BinaryDownlodable call() throws Exception {
              System.out.println("Starting: " + downlodable.remote());
              final Request request = new Request.Builder()
                  .cacheControl(CacheControl.FORCE_NETWORK)
                  .url(downlodable.remote())
                  .build();
              final Response response = okHttpClient.newCall(request).execute();
              final InputStream inputStream = response.body().byteStream();
              final File newFile = new File(downlodable.local());
              final byte[] buff = new byte[4096];
              long downloaded = 0;
              final long target = response.body().contentLength();
              final String totalSize = FileUtils.readableFileSize(target);
              try (OutputStream outStream = new FileOutputStream(newFile)) {
                while (true) {
                  int read = inputStream.read(buff);
                  if (read == -1) {
                    break;
                  }
                  outStream.write(buff, 0, read);
                  //write buff
                  downloaded += read;
                }
                updateDbItem(downlodable, downloaded, target);
              }
              return downlodable;
            }
          })
              .retryWhen(new RetryWithDelay(5, 3000))
              .subscribeOn(Schedulers.io());
        }, 3);
  }
...