Поскольку у меня нет большого опыта работы с RxJava / RxAndroid, я пытаюсь что-то реализовать, и я не уверен, как это сделать .. Проблема, с которой я сталкиваюсь, заключается в том, что эта операция блокирует мой поток пользовательского интерфейса..
Сначала я инициализирую DataOutputStream и класс, который будет что-то читать и записывать в него:
public static void startReceiveLogs() {
mReadFlag = true;
try {
os = new DataOutputStream(new FileOutputStream("/sdcard/myApp/" + TimeUtils.getCurrentTime(TimeZone.getDefault()) + "_logs.uedl"));
} catch (FileNotFoundException e) {
e.printStackTrace();
}
rxLogsReaderClass = new RxLogsReaderClass();
rxLogsReaderClass.startListening();
}
Это класс:
static class RxLogsReaderClass {
byte[] rbuf = new byte[64];
public void startListening() {
ExecutorService executor = Executors.newFixedThreadPool(2);
Observable<Integer> myObs = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(mSerial.read(rbuf, mSerialPortLog));
emitter.onComplete();
}
});
Observer<Integer> observer = new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
while (mReadFlag) {
try {
int len = mSerial.readLog(rbuf, mSerialPortLog);
if (len > 0) {
os.write(rbuf, 0, len);
}
} catch (NullPointerException e1) {
e1.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
}
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
};
myObs.subscribeOn(Schedulers.io());
myObs.observeOn(Schedulers.from(executor));
myObs.subscribe(observer);
}
}
Иэто метод чтения, который должен вызывать onNext () ... он просто читает что-то из USBDeviceConnection и возвращает int:
public int readLog(byte[] buf, int channel) {
if (DeviceisXR2280X == 1) {
channel = 0;
}
int returnValue;
try {
returnValue = this.mDeviceConnection.bulkTransfer(this.mEXAREndpointIN[channel], buf, buf.length, 10000);
} catch (ArrayIndexOutOfBoundsException aioue) {
returnValue = this.mDeviceConnection.bulkTransfer(this.mEXAREndpointIN[this.mEXAREndpointIN.length - 1], buf, buf.length, 10000);
}
return returnValue;
}
Я пытался наблюдать на этом и на AndroidSchedulers.mainThread (), но в каждомесли это блокирует основной поток ... Можете ли вы сказать мне, что я делаю не так здесь ...