Я не хочу использовать сокет udp с rxjava2, вот основные коды.
public void receive(int port, int timeout) {
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
try {
if (mSocket == null) {
mSocket = new DatagramSocket(null);
mSocket.setSoTimeout(timeout);
mSocket.setBroadcast(true);
mSocket.setReuseAddress(true);
mSocket.bind(new InetSocketAddress(port));
}
while (!mIsClosed) {
//接收数据的buf数组并指定大小
byte[] buf = new byte[1024];
//创建接收数据包,存储在buf中
DatagramPacket packet = new DatagramPacket(buf, buf.length);
multicastLock.acquire();
//接收操作
mSocket.receive(packet);
byte data[] = packet.getData();// 接收的数据
String result = new String(data);
//检查数据是否合法
if (checkMessageLegal(result)) {
emitter.onNext(result);
} else {
emitter.onError(new Exception("the data is illegal"));
}
}
} catch (SocketException e) {
LogUtils.w(TAG, e.getMessage());
if (!mIsClosed) {
emitter.onError(e);
}
} catch (IOException e) {
LogUtils.w(TAG, e.getMessage());
if (!mIsClosed) {
emitter.onError(e);
}
}
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
mDisposable = d;
}
@Override
public void onNext(String result) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
Я вызываю receive(...)
в onCreate () в действии, когда обратное действие вызывает mSocket.close () и mDisposable.dispose (). Но когда я звоню receive(...)
несколько раз, subscribe()
не может иногда выполняться. Я новичок в мире Rx и могу использовать некоторую помощь, чтобы выяснить, что не так. Заранее спасибо!