Я пытаюсь работать с библиотекой RxJava для Bluetooth на Android. Мои требования:
- Наблюдаемый для контроля состояния соединения
- Соединение автоматически перезапускается при любой ошибке
- Наблюдаемый для прослушивания сообщений от соединения
Это то, что у меня есть, но я не могу не думать, что это запутанно. Прежде всего, меня беспокоит то, что способ, которым я принудил ошибки из нисходящего потока, вызывать создание нового соединения. В частности, я использую тему публикации mCommErrors
, когда получаю ошибку чтения / анализа.
Я думаю, что все это необходимо, потому что мой клиент всегда будет подключен к наблюдаемой, созданной в applyIsConnected (), которая предотвращает переход refCount()
в 0 при любой ошибке.
Спасибо за любую информацию, которую вы можете предоставить. У меня гораздо большие проблемы с тупиковой ситуацией, связанной с оператором refCount (), поэтому я пытаюсь отсеять все возможные «плохие вещи», которые я совершил.
public class MyBluetoothDevice
{
private final MessageParser mParser;
private final Observable<MyBluetoothConnection> mConnectionObservable;
// The comms errors will be piped in so we can restart our connection as needed. These errors
// can originate post-connection when reading / sending messages
private final PublishSubject<Throwable> mCommsErrors = PublishSubject.create();
public MyBluetoothDevice(BluetoothDevice device, MessageParser parser, Scheduler scheduler)
{
mParser = parser;
mConnectionObservable = mCommsErrors
.switchMap(err -> Observable.<BluetoothSocket> error(err))
.mergeWith(createConnection(device, SPP_UUID))
.subscribeOn(scheduler)
.map(MyBluetoothConnection::new)
.retryWhen(err -> err.delay(5, TimeUnit.SECONDS))
.replay(1)
.refCount();
}
// Copy this from RxBluetooth since they forget to close the bluetooth socket if an error occurs
private Observable<BluetoothSocket> createConnection(final BluetoothDevice device, final UUID uuid)
{
return Observable.create(emitter -> {
try {
final BluetoothSocket bluetoothSocket = device.createRfcommSocketToServiceRecord(uuid);
emitter.setCancellable(() -> silentlyCloseSocket(bluetoothSocket));
// We don't have to handle closing the socket if connect fails, because setCancellable
// will cause the socket to be closed when the error is emitted
bluetoothSocket.connect();
emitter.onNext(bluetoothSocket);
} catch (IOException e) {
emitter.onError(e);
}
});
}
private void silentlyCloseSocket(BluetoothSocket socket)
{
try {
socket.close();
} catch (IOException ignore) {
}
}
public Observable<Boolean> observeIsConnected()
{
return mConnectionObservable
.switchMap(MyBluetoothConnection::observeIsConnected);
}
public Observable<TruPulseMessage> observeMessages()
{
return mConnectionObservable
.switchMap(connection -> connection
.observeStringLineStream()
.switchMap(mParser::parse)
.doOnError(err -> connection.closeConnection())
.doOnError(err -> mCommsErrors.onNext(err))
.onErrorResumeNext(Observable.empty())
);
}
public Completable send(String command)
{
return mConnectionObservable
.take(1)
.concatMapCompletable(connection -> connection.sendCommand(command))
.doOnError(mCommsErrors::onNext);
}
}
а вот и MyBluetoothConnection
public class MyBluetoothConnection
{
private final BluetoothSocket mSocket;
private final InputStream mInputStream;
private final OutputStream mOutputStream;
private final BehaviorSubject<Boolean> mIsConnected;
private Observable<String> mOutObservable;
MyBluetoothConnection(BluetoothSocket socket) throws Exception
{
if (socket == null) {
throw new InvalidParameterException("Bluetooth socket can't be null");
}
this.mSocket = socket;
this.mIsConnected = BehaviorSubject.createDefault(Boolean.TRUE);
try {
mInputStream = socket.getInputStream();
mOutputStream = socket.getOutputStream();
} catch (IOException e) {
closeConnection();
throw new IOException("Can't get stream from bluetooth socket", e);
}
}
public Observable<Boolean> observeIsConnected()
{
return mIsConnected;
}
public Observable<String> observeStringLineStream()
{
if (mOutObservable == null) {
mOutObservable = Observable.create((ObservableOnSubscribe<String>) emitter -> {
BufferedReader reader = new BufferedReader(new InputStreamReader(mInputStream));
try {
String line = "";
while (!emitter.isDisposed() && line != null) {
line = reader.readLine();
if (line != null)
emitter.onNext(line);
else
emitter.onComplete();
}
} catch (IOException e) {
closeConnection();
if (!emitter.isDisposed())
emitter.onError(e);
}
}).share();
}
return mOutObservable;
}
public Completable sendCommand(String command)
{
return Completable.create(emitter -> {
if (!mIsConnected.getValue()) {
emitter.onError(new IOException("BluetoothConnection is disconnected"));
return;
}
String line = command + "\r\n";
try {
mOutputStream.write(line.getBytes());
mOutputStream.flush();
emitter.onComplete();
} catch (IOException e) {
// Error occurred. Better to close terminate the connection
closeConnection();
if (!emitter.isDisposed())
emitter.onError(new IOException("Can't send Bluetooth command", e));
}
});
}
/**
* Close the streams and socket connection.
*/
public void closeConnection()
{
if (!mIsConnected.getValue())
return;
try {
if (mInputStream != null) {
mInputStream.close();
}
if (mOutputStream != null) {
mOutputStream.close();
}
if (mSocket != null) {
mSocket.close();
}
} catch (IOException ignored) {
} finally {
mIsConnected.onNext(Boolean.FALSE);
}
}
}