Android Rx Android загрузка файла в основном потоке - PullRequest
0 голосов
/ 18 марта 2020

Я только что взял на себя android приложение, разработанное кем-то другим. Они использовали Rx Java и много Observables. Я получаю следующую ошибку при запуске приложения

03-18 07:31:04.913  4102  4102 W System.err: android.os.NetworkOnMainThreadException
03-18 07:31:04.913  4102  4102 W System.err:    at android.os.StrictMode$AndroidBlockGuardPolicy.onNetwork(StrictMode.java:1303)
03-18 07:31:04.914  4102  4102 W System.err:    at com.android.org.conscrypt.Platform.blockGuardOnNetwork(Platform.java:300)
03-18 07:31:04.914  4102  4102 W System.err:    at com.android.org.conscrypt.OpenSSLSocketImpl$SSLOutputStream.write(OpenSSLSocketImpl.java:808)
03-18 07:31:04.914  4102  4102 W System.err:    at okio.Okio$1.write(Okio.java:79)
03-18 07:31:04.914  4102  4102 W System.err:    at okio.AsyncTimeout$1.write(AsyncTimeout.java:180)
03-18 07:31:04.914  4102  4102 W System.err:    at okio.RealBufferedSink.flush(RealBufferedSink.java:224)
03-18 07:31:04.914  4102  4102 W System.err:    at okhttp3.internal.http2.Http2Writer.rstStream(Http2Writer.java:152)
03-18 07:31:04.914  4102  4102 W System.err:    at okhttp3.internal.http2.Http2Connection.writeSynReset(Http2Connection.java:342)
03-18 07:31:04.914  4102  4102 W System.err:    at okhttp3.internal.http2.Http2Stream.close(Http2Stream.java:243)
03-18 07:31:04.914  4102  4102 W System.err:    at okhttp3.internal.http2.Http2Stream.cancelStreamIfNecessary(Http2Stream.java:516)
03-18 07:31:04.914  4102  4102 W System.err:    at okhttp3.internal.http2.Http2Stream$FramingSource.close(Http2Stream.java:494)
03-18 07:31:04.914  4102  4102 W System.err:    at okio.ForwardingSource.close(ForwardingSource.java:43)
03-18 07:31:04.914  4102  4102 W System.err:    at okhttp3.internal.http2.Http2Codec$StreamFinishingSource.close(Http2Codec.java:217)
03-18 07:31:04.914  4102  4102 W System.err:    at okio.RealBufferedSource.close(RealBufferedSource.java:468)
03-18 07:31:04.914  4102  4102 W System.err:    at okio.RealBufferedSource$1.close(RealBufferedSource.java:452)
03-18 07:31:04.914  4102  4102 W System.err:    at com.android.tools.profiler.support.network.HttpTracker$InputStreamTracker.close(HttpTracker.java:76)
03-18 07:31:04.914  4102  4102 W System.err:    at okio.Okio$2.close(Okio.java:152)
03-18 07:31:04.915  4102  4102 W System.err:    at okio.RealBufferedSource.close(RealBufferedSource.java:468)
03-18 07:31:04.915  4102  4102 W System.err:    at okio.ForwardingSource.close(ForwardingSource.java:43)
03-18 07:31:04.915  4102  4102 W System.err:    at okio.RealBufferedSource.close(RealBufferedSource.java:468)
03-18 07:31:04.915  4102  4102 W System.err:    at com.rahul.player.homescreen.data.IMainRepository.writeResponseBodyToDisk(IMainRepository.java:407)
03-18 07:31:04.915  4102  4102 W System.err:    at com.rahul.player.homescreen.data.IMainRepository.lambda$downloadMedia$11$IMainRepository(IMainRepository.java:319)
03-18 07:31:04.915  4102  4102 W System.err:    at com.rahul.player.homescreen.data.-$$Lambda$IMainRepository$4EieSFvRA5n3fut6dGA08iJETXE.apply(lambda)
03-18 07:31:04.915  4102  4102 W System.err:    at io.reactivex.internal.operators.observable.ObservableFlatMap$MergeObserver.onNext(ObservableFlatMap.java:121)
03-18 07:31:04.915  4102  4102 W System.err:    at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeOnObserver.onNext(ObservableSubscribeOn.java:63)
03-18 07:31:04.915  4102  4102 W System.err:    at io.reactivex.internal.operators.observable.ObservableCreate$CreateEmitter.onNext(ObservableCreate.java:67)
03-18 07:31:04.915  4102  4102 W System.err:    at com.rahul.player.di.data.ApiManager$3.onResponse(ApiManager.java:101)
03-18 07:31:04.915  4102  4102 W System.err:    at retrofit2.ExecutorCallAdapterFactory$ExecutorCallbackCall$1$1.run(ExecutorCallAdapterFactory.java:71)
03-18 07:31:04.915  4102  4102 W System.err:    at android.os.Handler.handleCallback(Handler.java:751)
03-18 07:31:04.915  4102  4102 W System.err:    at android.os.Handler.dispatchMessage(Handler.java:95)
03-18 07:31:04.915  4102  4102 W System.err:    at android.os.Looper.loop(Looper.java:154)
03-18 07:31:04.915  4102  4102 W System.err:    at android.app.ActivityThread.main(ActivityThread.java:6119)
03-18 07:31:04.915  4102  4102 W System.err:    at java.lang.reflect.Method.invoke(Native Method)
03-18 07:31:04.915  4102  4102 W System.err:    at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:886)
03-18 07:31:04.915  4102  4102 W System.err:    at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:776)

У меня есть следующий код.

В ViewModel

mGetStuffsUsecase.execute(new DisposableSingleObserver<Stuff>() {

@Override
public void onSuccess(Stuff stuff) {
// do sucess stuff
}

@Override
public void onError(Throwable e) {
// do error stuff
}
}, null);

В GetStuffUsecase

public class GetStuffsUsecase extends UseCase<Stuff, Void> {


private MainRepository mMainRepository;

@Inject
public GetStuffsUsecase(MainRepository mainRepository) {
    super();
    mMainRepository = mainRepository;
}

@Override
public Single<Stuff> buildUseCaseObservable(Void aVoid) {
    return mMainRepository.getStuffs();
}
}

И внутри MainRepository это соответствующие части кода

Внутри getStuffs метод:

return downloadStuffList(stuff)
    .toList().map(booleans -> {
        return stuff;
    }).flatMap(schedule -> {
        return Single.just(stuff);
        }
    });
}).doOnError(throwable ->
// do error stuff
).subscribeOn(Schedulers.io());

И downloadStuffList является наблюдаемым

public Observable<Boolean> downloadStuffList(List<Stuff> stuffList) {
mediaList = new ArrayList<>();
// populate mediaList

return Observable.fromIterable(mediaList)
    .flatMap(medium -> downloadMedia(medium)).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())
    .take(mediaList.size());
}

downloadMedia - еще одна наблюдаемая

Observable<Boolean> downloadMedia(Medium medium) {

    String fileName = Utils.getFileName(medium.getMediaPath());
    File file = new File(mContext.getExternalFilesDir(null)
            + File.separator + (fileName));
    if (!file.exists()) {
        return mApiManager.downloadFileWithDynamicUrlSync(medium.getMediaPath())
                .flatMap((Function<ResponseBody, ObservableSource<Boolean>>)
                        responseBody -> {
                            boolean verified = writeResponseBodyToDisk(responseBody, fileName).equalsIgnoreCase(medium.getChecksum());
                            if (!verified) {
                                File filex = new File(mContext.getExternalFilesDir(null)
                                        + File.separator + fileName);
                                filex.delete();
                            }
                            return Observable.just(verified);
                        }).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());
    }
}

Класс APIManager имеет следующую наблюдаемую

public Observable<ResponseBody> downloadFileWithDynamicUrlSync(String url) {
return Observable.create((ObservableOnSubscribe<ResponseBody>)
    e -> service.downloadFileWithDynamicUrlSync(url).enqueue(new Callback<ResponseBody>() {
        @Override
        public void onResponse(Call<ResponseBody> call, Response<ResponseBody> response) {
            if (response.isSuccessful()) {
                e.onNext(response.body());
            } else e.onError(new Exception(response.errorBody().toString()));
        }

        @Override
        public void onFailure(Call<ResponseBody> call, Throwable t) {

        }
    })).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());
 }

, а APIService - интерфейс

public interface ApiService {
@Streaming
@GET
Call<ResponseBody> downloadFileWithDynamicUrlSync(@Url String fileUrl);
}

И, наконец, writeResponseBodyToDisk

private String writeResponseBodyToDisk(ResponseBody body, String fileName) {
    try {
        File file = new File(mContext.getExternalFilesDir(null)
                + File.separator + fileName);

        try {

            long fileSize = body.contentLength();

            BufferedSource source = body.source();
            BufferedSink sink = Okio.buffer(Okio.sink(file));
            sink.flush();
            sink.close();
            Log.d("Download", "Downloading file  of size : " + fileSize);
            if (fileName.contains(".zip"))
                unpackZip(file);
            return calculateMD5(file.getAbsolutePath());
        } catch (IOException e) {
            return "";
        }
    } catch (IOException e) {
        return "";
    }
}

Если я заменю BufferdSource и Sink на InputStream и OutputStream и запишу файл, используя какое-то время l oop, приложение не взломает sh, но зависает до завершения загрузки.

Я не очень много работал с Rx Java и не могу понять, почему операции выполняются на MainThread.

...