Текучий, который ничего не передает в onNext () DisposableSubscriber в Android-работника (менеджер служб) - PullRequest
0 голосов
/ 13 января 2019

Я хочу загрузить файл в рабочий Android и показать прогресс загрузки для этого приостановите рабочий поток и дождитесь завершения задачи загрузки файла.

загрузка на сервер успешна, а эмиттер передает текущее значение, но DisposableSubscriber не вызывайте onNext() метод !!

ниже кода показывает, как загрузить файл и вывести текущий прогресс

  @Override
public Flowable<Double> uploadFile(Message message, String ticket_id) {
     ///create Flowable
    return Flowable.create((FlowableEmitter<Double> emitter) -> {
        try {
             //call api
            Timber.w("upload file");
            ResponseBody response = api.postFile(
                    createMultipartBody(message, emitter),
                    RequestBody.create(MediaType.parse("text/plain"), "file"),
                    RequestBody.create(MediaType.parse("text/plain"), message.getFileType()),
                    ticket_id,
                    getApiKey()
            )
                    .doOnError(emitter::tryOnError)
                    .blockingGet();
            emitter.onComplete();
        } catch (Exception e) {
            emitter.tryOnError(e);
        }
    }, BackpressureStrategy.LATEST);


}

private MultipartBody.Part createMultipartBody(Message message, FlowableEmitter<Double> emitter) {
    File file;
    if (message.getFileType().equals("File")) {
        file = new File(message.getUrlFile());

        return MultipartBody.Part.createFormData("file", file.getName(),
                createCountingRequestBody(file, emitter));
    } else {
        file = new File(message.getImageUrl());

        return MultipartBody.Part.createFormData("file", file.getName(),
                createCountingRequestBody(file, emitter));

    }
}


private RequestBody createCountingRequestBody(File file, FlowableEmitter<Double> emitter) {
    RequestBody requestBody = createRequestBody(file);
    return new CountingRequestBody(requestBody, (bytesWritten, contentLength) -> {
        emitter.onNext( (1.0 * bytesWritten) / contentLength);
    });
}


private RequestBody createRequestBody(File file) {
    return RequestBody.create(MediaType.parse("multipart/form-data"), file);
}

и вот мой рабочий класс, который вызывает uploadFile() метод

public class SenderService extends Worker {
// Define the parameter keys:
public static final String FILE_ADDRESS = "file_address";
public static final String FILE_TYPE = "type";
public static final String TICKET_ID = "ticket_id";
public static final String MESSAGE_ID = "message_id";
private final Message message;
private Data data;
@Inject
Repository repository;
private CountDownLatch latch;
private Result result;
private String ticketId;


public SenderService(@NonNull Context context, @NonNull WorkerParameters workerParams) {
    super(context, workerParams);
    setupComponent(((App) context.getApplicationContext()).getAppComponent());
    message=new Message();
    data=workerParams.getInputData();

    message.setFileType(data.getString(FILE_TYPE));
    message.setUrlFile(data.getString(FILE_ADDRESS));
    message.setId(data.getString(MESSAGE_ID));
    ticketId=data.getString(TICKET_ID);
    Timber.w("service called");
    latch = new CountDownLatch(1);


}

@NonNull
@Override
public Result doWork() {
  //check is  it new task or a failure task 
     if (message.getId()==null) {
    try(Realm realm=Realm.getDefaultInstance()){
            TicketDetail ticket = realm.where(TicketDetail.class).equalTo("id", ticketId).findFirst();
            String message_id=UUID.randomUUID().toString();
            message.setId(message_id);
            data = new Data.Builder()
                    .putString(SenderService.FILE_ADDRESS, message.getUrlFile())
                    .putString(SenderService.FILE_TYPE, message.getId())
                    .putString(SenderService.TICKET_ID,ticketId)
                    .putString(SenderService.MESSAGE_ID,message_id)
                    .build();
            realm.executeTransaction(realm1 -> {
                realm1.insert(message);
                RealmList<Message> messages = ticket
                        .getMessages();
                if (messages != null) {
                    messages.add(message);
                    ticket.setMessages(messages);
                } else {
                    messages = new RealmList<>();
                    messages.add(message);
                    ticket.setMessages(messages);
                }
            });

        }
    }
    //call repo and upload file
    repository.uploadFile(message,ticketId)
            .subscribeOn(Schedulers.io())
            .onBackpressureLatest()
                .subscribe(new DisposableSubscriber<Double>() {
                    @Override
                    protected void onStart() {
                        super.onStart();
                    }

                    @Override
        public void onNext(Double aDouble) {
              try (Realm realm = Realm.getDefaultInstance()) {
                        realm.executeTransaction(realm1 -> {
                            realm1.where(TicketDetail.class).equalTo("id", ticketId).findFirst().getMessages()
                                    .where().equalTo("id", message.getId()).findFirst().setProgress(aDouble);
                        });

                    }
            Timber.w("onNext:" +aDouble);
        }

        @Override
        public void onError(Throwable t) {
            Timber.e(t);
            result = Result.failure(data);
            latch.countDown();
        }

        @Override
        public void onComplete() {
            Timber.w("onComplete");
            repository.getTicketDetail(ticketId);
            result = Result.success();
            latch.countDown();
        }

    });

    try {
        latch.await();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return result;
}



protected void setupComponent(@NonNull AppComponent parentComponent) {
    DaggerSenderServiceComponent.builder()
            .appComponent(parentComponent)
            .senderServiceModule(new SenderServiceModule(this))
            .build()
            .inject(this);
}




}

Ответ : я использую throttleLatest(200,TimeUnit.MILLISECONDS) и моя проблема исправлена ​​:) throttleLatest испускает следующий элемент, испускаемый реактивным источником, затем периодически испускает последний элемент для контроля количества значений прогресса обновления за единицу времени:)

...