Ожидание завершения нескольких задач с использованием RxJava - PullRequest
1 голос
/ 15 октября 2019

У меня есть этот класс (не-Rx), который одновременно запускает 1000 потоков производителей и 1000 потоков потребителей, а затем ждет, пока они не обмениваются предварительно определенным количеством сообщений, через простую реализацию очереди блокировки. После завершения этого процесса наблюдатели уведомили о результате:

public class ProducerConsumerBenchmarkUseCase extends BaseObservable<ProducerConsumerBenchmarkUseCase.Listener> {

    public static interface Listener {
        void onBenchmarkCompleted(Result result);
    }

    public static class Result {
        private final long mExecutionTime;
        private final int mNumOfReceivedMessages;

        public Result(long executionTime, int numOfReceivedMessages) {
            mExecutionTime = executionTime;
            mNumOfReceivedMessages = numOfReceivedMessages;
        }

        public long getExecutionTime() {
            return mExecutionTime;
        }

        public int getNumOfReceivedMessages() {
            return mNumOfReceivedMessages;
        }
    }

    private static final int NUM_OF_MESSAGES = 1000;
    private static final int BLOCKING_QUEUE_CAPACITY = 5;

    private final Object LOCK = new Object();

    private final Handler mUiHandler = new Handler(Looper.getMainLooper());

    private final MyBlockingQueue mBlockingQueue = new MyBlockingQueue(BLOCKING_QUEUE_CAPACITY);

    private int mNumOfFinishedConsumers;

    private int mNumOfReceivedMessages;

    private long mStartTimestamp;


    public void startBenchmarkAndNotify() {

        synchronized (LOCK) {
            mNumOfReceivedMessages = 0;
            mNumOfFinishedConsumers = 0;
            mStartTimestamp = System.currentTimeMillis();
        }

        // watcher-reporter thread
        new Thread(() -> {
            synchronized (LOCK) {
                while (mNumOfFinishedConsumers < NUM_OF_MESSAGES) {
                    try {
                        LOCK.wait();
                    } catch (InterruptedException e) {
                        return;
                    }
                }
            }
            notifySuccess();
        }).start();

        // producers init thread
        new Thread(() -> {
            for (int i = 0; i < NUM_OF_MESSAGES; i++) {
                startNewProducer(i);
            }
        }).start();

        // consumers init thread
        new Thread(() -> {
            for (int i = 0; i < NUM_OF_MESSAGES; i++) {
                startNewConsumer();
            }
        }).start();
    }


    private void startNewProducer(final int index) {
        new Thread(() -> mBlockingQueue.put(index)).start();
    }

    private void startNewConsumer() {
        new Thread(() -> {
            int message = mBlockingQueue.take();
            synchronized (LOCK) {
                if (message != -1) {
                    mNumOfReceivedMessages++;
                }
                mNumOfFinishedConsumers++;
                LOCK.notifyAll();
            }
        }).start();
    }

    private void notifySuccess() {
        mUiHandler.post(() -> {
            Result result;
            synchronized (LOCK) {
                 result =
                        new Result(
                                System.currentTimeMillis() - mStartTimestamp,
                                mNumOfReceivedMessages
                        );
            }
            for (Listener listener : getListeners()) {
                listener.onBenchmarkCompleted(result);
            }
        });
    }


}

Теперь я хочу изменить его на Rx. До сих пор мне удалось продвинуться так далеко:

public class ProducerConsumerBenchmarkUseCase {

    public static class Result {
        private final long mExecutionTime;
        private final int mNumOfReceivedMessages;

        public Result(long executionTime, int numOfReceivedMessages) {
            mExecutionTime = executionTime;
            mNumOfReceivedMessages = numOfReceivedMessages;
        }

        public long getExecutionTime() {
            return mExecutionTime;
        }

        public int getNumOfReceivedMessages() {
            return mNumOfReceivedMessages;
        }
    }

    private static final int NUM_OF_MESSAGES = 1000;
    private static final int BLOCKING_QUEUE_CAPACITY = 5;

    private final MyBlockingQueue mBlockingQueue = new MyBlockingQueue(BLOCKING_QUEUE_CAPACITY);

    private final AtomicInteger mNumOfFinishedConsumers = new AtomicInteger(0);

    private final AtomicInteger mNumOfReceivedMessages = new AtomicInteger(0);

    private volatile long mStartTimestamp;


    public Maybe<Result> startBenchmark() {

        return Maybe.fromCallable(new Callable<Result>() {
            @Override
            public Result call() {

                mNumOfReceivedMessages.set(0);
                mNumOfFinishedConsumers.set(0);
                mStartTimestamp = System.currentTimeMillis();

                Observable.range(0, NUM_OF_MESSAGES)
                        .subscribeOn(Schedulers.io())
                        .observeOn(Schedulers.io())
                        .forEach(
                                index -> newProducer(index).subscribe()
                        );

                Observable.range(0, NUM_OF_MESSAGES)
                        .subscribeOn(Schedulers.io())
                        .observeOn(Schedulers.io())
                        .map(index -> newConsumer())
                        .doOnNext(completable -> completable.subscribe())
                        .flatMap(completable -> { return Observable.just(completable); })
                        .toList()
                        .blockingGet();


                return new Result(
                        System.currentTimeMillis() - mStartTimestamp,
                        mNumOfReceivedMessages.get()
                );

            }
        });

    }

    private Completable newProducer(final int index) {
        return Completable
                .fromAction(() -> mBlockingQueue.put(index))
                .subscribeOn(Schedulers.io());
    }

    private Completable newConsumer() {
        return Completable
                .fromAction(() -> {
                    int message = mBlockingQueue.take();
                    if (message != -1) {
                        mNumOfReceivedMessages.incrementAndGet();
                    }
                })
                .subscribeOn(Schedulers.io());
    }

}

Этот код компилируется, выполняется и даже завершается, но итоговое количество обмениваемых сообщений меньше 1000, что означает, что существует какая-то проблемаздесь.

Что я сделал не так?

1 Ответ

4 голосов
/ 15 октября 2019

Я не до конца понимаю, почему вы хотите выполнить этот тип обработки. Также я не думаю, что есть необходимость в подсчете сообщений, потому что вы должны генерировать 1000 сообщений. Обратите внимание, что с этим типом теста вы, скорее всего, измерите, насколько быстро система может создать 2000 потоков.

public Maybe<Result> startBenchmark() {
    return 
        Flowable.range(0, NUM_OF_MESSAGES)
        .flatMap(id -> 
             Flowable.fromCallable(() -> id) // <-- generate message
             .subscribeOn(Schedulers.io())
        )
        .parallel(NUM_OF_MESSAGES)
        .runOn(Schedulers.io())
        .doOnNext(msg -> { })  // <-- process message
        .sequential()
        .count()
        .doOnSubscribe(s -> { mStartTimestamp = System.currentTimeMillis(); })
        .map(cnt -> new Result(System.currentTimeMillis() - mStartTimestamp, cnt))
        .toMaybe();
 }
...