Rx прочитать файл и отправить данные 12 наблюдателей - PullRequest
0 голосов
/ 13 июня 2018

объясните последний код:

перед тем, как использовать приведенный ниже код для чтения и отправки обработчика этой даты.
код для чтения двоичного файла, выделенный 8 * 12 * 1002 байт, который является сигнальными данными (содержит 12 * 1002double value)

вопрос:

как я могу использовать rx_java2 для чтения этого файла и потоковой передачи Цикл, синхронный каждые 1002, двойной синхронный для 12 наблюдателей, которые публикуют двойное значение с задержкой каждые 2 мс

class readFile {


public fun getBuffer(sigId: String): ArrayList<DoubleArray> {


  val address="/data/data/com.example.amin.ecgs/Signals/Sig$sigId.bin"
    lateinit var buf: DoubleBuffer
    try {

        val rFile = RandomAccessFile(address, "rw")
        val inChannel = rFile.channel
        val buf_in = ByteBuffer.allocate(8 * 12 * 1002)
        buf_in.clear()
        inChannel.read(buf_in, 0)
        buf_in.rewind()
        buf = buf_in.asDoubleBuffer()

        inChannel.close()

    } catch (e: IOException) {
        e.printStackTrace()

    }

    return generateData(buf)

}

private fun generateData(buf: DoubleBuffer): ArrayList<DoubleArray> {

    val arrayList = ArrayList<DoubleArray>(12)

    for (n in 0..11) {
        val yb = DoubleArray(1002)
        buf.get(yb, n * 1002, (n + 1) * 1002)

        arrayList.add(yb)

    }
    buf.clear()
    return arrayList
}


}

1 Ответ

0 голосов
/ 28 июня 2018

Вы можете попробовать concat операцию с созданием каждого Observable или создать что-то из Observable.fromIterator(), как показано.Обратите внимание, что обработка ошибок становится сложной, когда вы делаете это, и когда вам приходится обрабатывать ошибки вниз по течению.Было бы лучше использовать Flowable из-за BackPressureStrategy, который вы можете определить с помощью onErrorResumeNext, если хотите продолжить работу со следующим массивом, если один из них завершится неудачей.

public class ReadBinaryDate extends Thread {

    final ArrayList<Observable<double[]>> doubleObservables = new ArrayList<>();

    private Observable<double[]> generateObservable (final double [] array1002) {
        return Observable.interval(2, TimeUnit.MILLISECONDS)
            .flatMap(new Function<Long, ObservableSource<double[]>>() {
                @Override
                public ObservableSource<double[]> apply(Long aLong) throws Exception {
                    return Observable.just(array1002);
                }
            });
    }

    private void generateData() {

        arrayList.clear();
        for (int n = 0; n < 12; n++) {
            double[] yb = new double[1002];
            buf.get(yb, n*1002, (n+1)*1002);
            doubleObservables.add(generateObservable(yb));
        }
    }

    private void subscribeStream() {
        Observable.fromIterable(doubleObservables)
            .subscribe(new Observer<Observable<double[]>>() {
                @Override
                public void onSubscribe(Disposable d) { }

                @Override
                public void onNext(Observable<double[]> observable) {
                    // do on each array
                }

                @Override
                public void onError(Throwable e) {
                    // handle error
                }

                @Override
                public void onComplete() {
                    // do on complete
                }
            });
    }
}
...