rxjava2.1.5: используйте rxjava2 с udpsocket в Android, ObservableOnSubscribe подписка не вызывает - PullRequest
0 голосов
/ 18 марта 2019

Я не хочу использовать сокет udp с rxjava2, вот основные коды.

public void receive(int port, int timeout) {
        Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {

                try {

                    if (mSocket == null) {
                        mSocket = new DatagramSocket(null);
                        mSocket.setSoTimeout(timeout);
                        mSocket.setBroadcast(true);
                        mSocket.setReuseAddress(true);
                        mSocket.bind(new InetSocketAddress(port));
                    }

                    while (!mIsClosed) {
                        //接收数据的buf数组并指定大小
                        byte[] buf = new byte[1024];
                        //创建接收数据包,存储在buf中
                        DatagramPacket packet = new DatagramPacket(buf, buf.length);
                        multicastLock.acquire();

                        //接收操作
                        mSocket.receive(packet);
                        byte data[] = packet.getData();// 接收的数据

                        String result = new String(data);

                        //检查数据是否合法
                        if (checkMessageLegal(result)) {
                            emitter.onNext(result);
                        } else {
                            emitter.onError(new Exception("the data is illegal"));
                        }
                    }

                } catch (SocketException e) {
                    LogUtils.w(TAG, e.getMessage());
                    if (!mIsClosed) {
                        emitter.onError(e);
                    }
                } catch (IOException e) {
                    LogUtils.w(TAG, e.getMessage());
                    if (!mIsClosed) {
                        emitter.onError(e);
                    }
                }

            }
        }).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                  mDisposable = d;
            }

            @Override
            public void onNext(String result) {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });
    }

Я вызываю receive(...) в onCreate () в действии, когда обратное действие вызывает mSocket.close () и mDisposable.dispose (). Но когда я звоню receive(...) несколько раз, subscribe() не может иногда выполняться. Я новичок в мире Rx и могу использовать некоторую помощь, чтобы выяснить, что не так. Заранее спасибо!

...