У меня есть этот класс (не-Rx), который одновременно запускает 1000 потоков производителей и 1000 потоков потребителей, а затем ждет, пока они не обмениваются предварительно определенным количеством сообщений, через простую реализацию очереди блокировки. После завершения этого процесса наблюдатели уведомили о результате:
public class ProducerConsumerBenchmarkUseCase extends BaseObservable<ProducerConsumerBenchmarkUseCase.Listener> {
public static interface Listener {
void onBenchmarkCompleted(Result result);
}
public static class Result {
private final long mExecutionTime;
private final int mNumOfReceivedMessages;
public Result(long executionTime, int numOfReceivedMessages) {
mExecutionTime = executionTime;
mNumOfReceivedMessages = numOfReceivedMessages;
}
public long getExecutionTime() {
return mExecutionTime;
}
public int getNumOfReceivedMessages() {
return mNumOfReceivedMessages;
}
}
private static final int NUM_OF_MESSAGES = 1000;
private static final int BLOCKING_QUEUE_CAPACITY = 5;
private final Object LOCK = new Object();
private final Handler mUiHandler = new Handler(Looper.getMainLooper());
private final MyBlockingQueue mBlockingQueue = new MyBlockingQueue(BLOCKING_QUEUE_CAPACITY);
private int mNumOfFinishedConsumers;
private int mNumOfReceivedMessages;
private long mStartTimestamp;
public void startBenchmarkAndNotify() {
synchronized (LOCK) {
mNumOfReceivedMessages = 0;
mNumOfFinishedConsumers = 0;
mStartTimestamp = System.currentTimeMillis();
}
// watcher-reporter thread
new Thread(() -> {
synchronized (LOCK) {
while (mNumOfFinishedConsumers < NUM_OF_MESSAGES) {
try {
LOCK.wait();
} catch (InterruptedException e) {
return;
}
}
}
notifySuccess();
}).start();
// producers init thread
new Thread(() -> {
for (int i = 0; i < NUM_OF_MESSAGES; i++) {
startNewProducer(i);
}
}).start();
// consumers init thread
new Thread(() -> {
for (int i = 0; i < NUM_OF_MESSAGES; i++) {
startNewConsumer();
}
}).start();
}
private void startNewProducer(final int index) {
new Thread(() -> mBlockingQueue.put(index)).start();
}
private void startNewConsumer() {
new Thread(() -> {
int message = mBlockingQueue.take();
synchronized (LOCK) {
if (message != -1) {
mNumOfReceivedMessages++;
}
mNumOfFinishedConsumers++;
LOCK.notifyAll();
}
}).start();
}
private void notifySuccess() {
mUiHandler.post(() -> {
Result result;
synchronized (LOCK) {
result =
new Result(
System.currentTimeMillis() - mStartTimestamp,
mNumOfReceivedMessages
);
}
for (Listener listener : getListeners()) {
listener.onBenchmarkCompleted(result);
}
});
}
}
Теперь я хочу изменить его на Rx. До сих пор мне удалось продвинуться так далеко:
public class ProducerConsumerBenchmarkUseCase {
public static class Result {
private final long mExecutionTime;
private final int mNumOfReceivedMessages;
public Result(long executionTime, int numOfReceivedMessages) {
mExecutionTime = executionTime;
mNumOfReceivedMessages = numOfReceivedMessages;
}
public long getExecutionTime() {
return mExecutionTime;
}
public int getNumOfReceivedMessages() {
return mNumOfReceivedMessages;
}
}
private static final int NUM_OF_MESSAGES = 1000;
private static final int BLOCKING_QUEUE_CAPACITY = 5;
private final MyBlockingQueue mBlockingQueue = new MyBlockingQueue(BLOCKING_QUEUE_CAPACITY);
private final AtomicInteger mNumOfFinishedConsumers = new AtomicInteger(0);
private final AtomicInteger mNumOfReceivedMessages = new AtomicInteger(0);
private volatile long mStartTimestamp;
public Maybe<Result> startBenchmark() {
return Maybe.fromCallable(new Callable<Result>() {
@Override
public Result call() {
mNumOfReceivedMessages.set(0);
mNumOfFinishedConsumers.set(0);
mStartTimestamp = System.currentTimeMillis();
Observable.range(0, NUM_OF_MESSAGES)
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io())
.forEach(
index -> newProducer(index).subscribe()
);
Observable.range(0, NUM_OF_MESSAGES)
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io())
.map(index -> newConsumer())
.doOnNext(completable -> completable.subscribe())
.flatMap(completable -> { return Observable.just(completable); })
.toList()
.blockingGet();
return new Result(
System.currentTimeMillis() - mStartTimestamp,
mNumOfReceivedMessages.get()
);
}
});
}
private Completable newProducer(final int index) {
return Completable
.fromAction(() -> mBlockingQueue.put(index))
.subscribeOn(Schedulers.io());
}
private Completable newConsumer() {
return Completable
.fromAction(() -> {
int message = mBlockingQueue.take();
if (message != -1) {
mNumOfReceivedMessages.incrementAndGet();
}
})
.subscribeOn(Schedulers.io());
}
}
Этот код компилируется, выполняется и даже завершается, но итоговое количество обмениваемых сообщений меньше 1000, что означает, что существует какая-то проблемаздесь.
Что я сделал не так?