Привет. Я пытаюсь загрузить файлы параллельно, используя rxjava2, но кое-как, как это не работает, мой код ниже:
private Flowable<BinaryDownlodable> downloadFile(List<BinaryDownlodable> binaryDownlodables) throws IOException {
return Flowable.fromIterable(binaryDownlodables)
.flatMap((BinaryDownlodable downlodable) -> {
return Flowable.fromCallable(new Callable<BinaryDownlodable>() {
@Override
public BinaryDownlodable call() throws Exception {
System.out.println("Starting: " + downlodable.remote());
final Request request = new Request.Builder()
.cacheControl(CacheControl.FORCE_NETWORK)
.url(downlodable.remote())
.build();
final Response response = okHttpClient.newCall(request).execute();
final InputStream inputStream = response.body().byteStream();
final File newFile = new File(downlodable.local());
final byte[] buff = new byte[4096];
long downloaded = 0;
final long target = response.body().contentLength();
final String totalSize = FileUtils.readableFileSize(target);
try (OutputStream outStream = new FileOutputStream(newFile)) {
while (true) {
int read = inputStream.read(buff);
if (read == -1) {
break;
}
outStream.write(buff, 0, read);
//write buff
downloaded += read;
}
updateDbItem(downlodable, downloaded, target);
}
return downlodable;
}
});
}, 3);
}
BinaryDownloadable.java
public final class BinaryDownlodable implements Downloable {
private String urlLocal;
private String urlRemote;
private boolean completed;
private Object item;
public BinaryDownlodable(String urlLocal, String urlRemote) {
this.urlLocal = urlLocal;
this.urlRemote = urlRemote;
}
public Object getItem() {
return item;
}
public void setItem(Object item) {
this.item = item;
}
@Override
public String local() {
return urlLocal;
}
@Override
public String remote() {
return urlRemote;
}
@Override
public void completed(boolean completed) {
this.completed = completed;
}
public String getUrlLocal() {
return urlLocal;
}
public String getUrlRemote() {
return urlRemote;
}
public boolean isCompleted() {
return completed;
}
@Override
public String toString() {
return "BinaryDownlodable{" +
"urlLocal='" + urlLocal + '\'' +
", urlRemote='" + urlRemote + '\'' +
'}';
}
}
Вот как я звоню:
downloadFile(binaryDownlodables)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<BinaryDownlodable>() {
@Override
public void accept(BinaryDownlodable binaryDownlodable) throws Exception {
System.out.println("Accepted");
}
});