RxJava2 - передача ошибок в нисходящем направлении - PullRequest
0 голосов
/ 27 апреля 2018

Я пытаюсь работать с библиотекой RxJava для Bluetooth на Android. Мои требования:

  1. Наблюдаемый для контроля состояния соединения
  2. Соединение автоматически перезапускается при любой ошибке
  3. Наблюдаемый для прослушивания сообщений от соединения

Это то, что у меня есть, но я не могу не думать, что это запутанно. Прежде всего, меня беспокоит то, что способ, которым я принудил ошибки из нисходящего потока, вызывать создание нового соединения. В частности, я использую тему публикации mCommErrors, когда получаю ошибку чтения / анализа.

Я думаю, что все это необходимо, потому что мой клиент всегда будет подключен к наблюдаемой, созданной в applyIsConnected (), которая предотвращает переход refCount() в 0 при любой ошибке.

Спасибо за любую информацию, которую вы можете предоставить. У меня гораздо большие проблемы с тупиковой ситуацией, связанной с оператором refCount (), поэтому я пытаюсь отсеять все возможные «плохие вещи», которые я совершил.

public class MyBluetoothDevice
{   
    private final MessageParser mParser;
    private final Observable<MyBluetoothConnection> mConnectionObservable;
    // The comms errors will be piped in so we can restart our connection as needed. These errors
    // can originate post-connection when reading / sending messages
    private final PublishSubject<Throwable> mCommsErrors = PublishSubject.create();

public MyBluetoothDevice(BluetoothDevice device, MessageParser parser, Scheduler scheduler)
{
    mParser = parser;

    mConnectionObservable = mCommsErrors
            .switchMap(err -> Observable.<BluetoothSocket> error(err))
            .mergeWith(createConnection(device, SPP_UUID))
            .subscribeOn(scheduler)
            .map(MyBluetoothConnection::new)
            .retryWhen(err -> err.delay(5, TimeUnit.SECONDS))
            .replay(1)
            .refCount();
}

// Copy this from RxBluetooth since they forget to close the bluetooth socket if an error occurs
private Observable<BluetoothSocket> createConnection(final BluetoothDevice device, final UUID uuid)
{
    return Observable.create(emitter -> {
        try {
            final BluetoothSocket bluetoothSocket = device.createRfcommSocketToServiceRecord(uuid);
            emitter.setCancellable(() -> silentlyCloseSocket(bluetoothSocket));
            // We don't have to handle closing the socket if connect fails, because setCancellable
            // will cause the socket to be closed when the error is emitted
            bluetoothSocket.connect();
            emitter.onNext(bluetoothSocket);
        } catch (IOException e) {
            emitter.onError(e);
        }
    });
}

private void silentlyCloseSocket(BluetoothSocket socket)
{
    try {
        socket.close();
    } catch (IOException ignore) {
    }
}

public Observable<Boolean> observeIsConnected()
{
    return mConnectionObservable
            .switchMap(MyBluetoothConnection::observeIsConnected);
}

public Observable<TruPulseMessage> observeMessages()
{
    return mConnectionObservable
            .switchMap(connection -> connection
                    .observeStringLineStream()
                    .switchMap(mParser::parse)
                    .doOnError(err -> connection.closeConnection())
                    .doOnError(err -> mCommsErrors.onNext(err))
                    .onErrorResumeNext(Observable.empty())
                    );
}

public Completable send(String command)
{
    return mConnectionObservable
            .take(1)
            .concatMapCompletable(connection -> connection.sendCommand(command))
            .doOnError(mCommsErrors::onNext);
}
}

а вот и MyBluetoothConnection

public class MyBluetoothConnection
{
private final BluetoothSocket mSocket;
private final InputStream mInputStream;
private final OutputStream mOutputStream;
private final BehaviorSubject<Boolean> mIsConnected;
private Observable<String> mOutObservable;

MyBluetoothConnection(BluetoothSocket socket) throws Exception
{
    if (socket == null) {
        throw new InvalidParameterException("Bluetooth socket can't be null");
    }

    this.mSocket = socket;
    this.mIsConnected = BehaviorSubject.createDefault(Boolean.TRUE);

    try {
        mInputStream = socket.getInputStream();
        mOutputStream = socket.getOutputStream();
    } catch (IOException e) {
        closeConnection();
        throw new IOException("Can't get stream from bluetooth socket", e);
    }
}

public Observable<Boolean> observeIsConnected()
{
    return mIsConnected;
}

public Observable<String> observeStringLineStream()
{
    if (mOutObservable == null) {
        mOutObservable = Observable.create((ObservableOnSubscribe<String>) emitter -> {
            BufferedReader reader = new BufferedReader(new InputStreamReader(mInputStream));
            try {
                String line = "";
                while (!emitter.isDisposed() && line != null) {
                    line = reader.readLine();
                    if (line != null)
                        emitter.onNext(line);
                    else
                        emitter.onComplete();
                }
            } catch (IOException e) {
                closeConnection();
                if (!emitter.isDisposed())
                    emitter.onError(e);
            }
        }).share();
    }
    return mOutObservable;
}

public Completable sendCommand(String command)
{
    return Completable.create(emitter -> {
        if (!mIsConnected.getValue()) {
            emitter.onError(new IOException("BluetoothConnection is disconnected"));
            return;
        }

        String line = command + "\r\n";
        try {
            mOutputStream.write(line.getBytes());
            mOutputStream.flush();
            emitter.onComplete();
        } catch (IOException e) {
            // Error occurred. Better to close terminate the connection
            closeConnection();
            if (!emitter.isDisposed())
                emitter.onError(new IOException("Can't send Bluetooth command", e));
        }
    });
}

/**
 * Close the streams and socket connection.
 */
public void closeConnection()
{
    if (!mIsConnected.getValue())
        return;

    try {
        if (mInputStream != null) {
            mInputStream.close();
        }

        if (mOutputStream != null) {
            mOutputStream.close();
        }

        if (mSocket != null) {
            mSocket.close();
        }
    } catch (IOException ignored) {
    } finally {
        mIsConnected.onNext(Boolean.FALSE);
    }
}
}

1 Ответ

0 голосов
/ 27 апреля 2018

Непонятно, почему у вас две отдельные цепочки наблюдателей. Если ваша основная цепочка наблюдателей устанавливает соединение, считывает строки из соединения, выдает ошибки и завершает работу, то вы можете применить свою логику повторения к одной цепочке наблюдателей. Оператор using() создает наблюдаемый элемент, который закрывается после завершения. Оператор switchMap() закроет соединение при ошибке или обрыве и откроет новое при создании нового сокета.

createConnection( device, id )
  .switchMap( socket -> Observable.using( 
                        () -> new MyBlueToothConnection( socket ),
                        connection -> connection.observeStringLineStream(),
                        connection -> connection.close() ) )
 .retryWhen( err -> err.delay(5, TimeUnit.SECONDS) )
 ...

Эта наблюдаемая будет излучать струны из устройства с голубым зубом столько, сколько вам нужно.

...