Это забавный вопрос:)
Почему PushStream медленнее?Что я делаю не так?
Спасибо, что не просто предположили, что реализация PushStream отстой.В этом случае это происходит медленнее, потому что (вероятно, не осознавая) вы просили это сделать!
Часть 1 - Буферизация
По умолчанию PushStreams буферизуются.Это означает, что они включают очередь, в которую помещаются события до их обработки.Поэтому буферизация делает несколько вещей, которые негативно влияют на скорость пропускной способности.
- Добавляет дополнительный шаг очереди / очереди в конвейер
- Добавляет дополнительный переключатель потока в обработке событий
- Политика по умолчанию для буфера - возвращать противодавление, связанное с тем, насколько заполнен буфер.
В этом случае подавляющее большинство замедления происходит из-за противодавления.Когда вы создаете поток, используя psp.createStream(source)
, он настраивается с буфером из 32 элементов и линейной политикой обратного давления, основанной на размере буфера, возвращая одну секунду при заполнении и 31 миллисекунд, когда в нем есть один элемент.Стоит отметить, что 31 миллис на элемент добавляет до 30 секунд!
Важно, что SimplePushEventSource всегда выполняет запросы обратного давления от потребителей, которые к нему добавляются.Это означает, что вы можете закачивать события в SimplePushEventSource так быстро, как только можете, но они будут доставляться только так быстро, как их запрашивает конвейер.
Если мы удалим буферизацию из потоков push, которые вызатем создаем следующий тест:
@Test
public void testPushStream2() throws Exception {
final PromiseFactory pf = new PromiseFactory(PromiseFactory.inlineExecutor());
final PushStreamProvider psp = new PushStreamProvider();
final SimplePushEventSource<Integer> source =
psp.buildSimpleEventSource(Integer.class)
.withQueuePolicy(QueuePolicyOption.BLOCK)
.build();
final Deferred<Instant> startD = pf.deferred();
final Deferred<Instant> endD = pf.deferred();
psp.buildStream(source).unbuffered().build().onClose(() -> endD.resolve( Instant.now()) ).forEach((i) -> {
if (i == 0) {
startD.resolve( Instant.now() );
}
});
final Promise<Long> nbEvent = psp.buildStream(source).unbuffered().build().count();
for (int i = 0; i < 1000; i++) {
source.publish(i);
}
source.endOfStream();
System.out.println("PushStream needs "
+ Duration.between( startD.getPromise().getValue(), endD.getPromise().getValue() ).toMillis()
+ " milliseconds to process " + nbEvent.getValue() + " events.");
}
Результат выполнения этого (на моей машине):
PushStream needs 39 milliseconds to process 1000 events.
Это, очевидно, намного ближе к тому, что вы ожидаете,но все равно заметно медленнее.Обратите внимание, что мы могли бы еще иметь некоторую буферизацию, но настроили PushbackPolicy.Это дало бы нам более высокую пропускную способность, но не так быстро, как это.
Часть 2 - Длина конвейера
Следующее, что следует отметить, - это использование обработчика onClose()
.Это добавляет дополнительную стадию в ваш конвейер push-потока.На самом деле вы можете переместить onClose в результате выполнения обещания, уменьшив длину вашего конвейера (вам нужно запустить его только один раз).
@Test
public void testPushStream3() throws Exception {
final PromiseFactory pf = new PromiseFactory(PromiseFactory.inlineExecutor());
final PushStreamProvider psp = new PushStreamProvider();
final SimplePushEventSource<Integer> source =
psp.buildSimpleEventSource(Integer.class)
.withQueuePolicy(QueuePolicyOption.BLOCK)
.build();
final Deferred<Instant> startD = pf.deferred();
final Deferred<Instant> endD = pf.deferred();
psp.buildStream(source).unbuffered().build().forEach((i) -> {
if (i == 0) {
startD.resolve( Instant.now() );
}
});
final Promise<Long> nbEvent = psp.buildStream(source).unbuffered().build().count()
.onResolve(() -> endD.resolve( Instant.now()));
for (int i = 0; i < 1000; i++) {
source.publish(i);
}
source.endOfStream();
System.out.println("PushStream needs "
+ Duration.between( startD.getPromise().getValue(), endD.getPromise().getValue() ).toMillis()
+ " milliseconds to process " + nbEvent.getValue() + " events.");
}
Результат этой версии (на моей машине)::
PushStream needs 21 milliseconds to process 1000 events.
Часть 3. Мультиплексирование доставки
Ключевое различие между примером «очереди блокировки необработанного массива» и примером PushStream заключается в том, что вы фактически создаете two PushStreams,Первый выполняет работу для определения времени начала, второй - для подсчета событий.Это заставляет SimplePushEventSource мультиплексировать события по нескольким потребителям.
Что, если мы свернули поведение в один конвейер, чтобы SimplePushEventSource мог использовать доставку по быстрому пути?
@Test
public void testPushStream4() throws Exception {
final PromiseFactory pf = new PromiseFactory(PromiseFactory.inlineExecutor());
final PushStreamProvider psp = new PushStreamProvider();
final SimplePushEventSource<Integer> source =
psp.buildSimpleEventSource(Integer.class)
.withQueuePolicy(QueuePolicyOption.BLOCK)
.build();
final Deferred<Instant> startD = pf.deferred();
final Deferred<Instant> endD = pf.deferred();
final Promise<Long> nbEvent = psp.buildStream(source).unbuffered().build()
.filter(i -> {
if (i == 0) {
startD.resolve( Instant.now() );
}
return true;
})
.count()
.onResolve(() -> endD.resolve( Instant.now()));
for (int i = 0; i < 1000; i++) {
source.publish(i);
}
source.endOfStream();
System.out.println("PushStream needs "
+ Duration.between( startD.getPromise().getValue(), endD.getPromise().getValue() ).toMillis()
+ " milliseconds to process " + nbEvent.getValue() + " events.");
}
Результат этой версии (на моей машине):
PushStream needs 3 milliseconds to process 1000 events.
Сводка
PushStreams - это быстрый и эффективный способ использовать асинхронно поступающие события, но очень важно понять, что такое буферизацияповедение подходит для вашего приложения.Если у вас есть большой кусок данных, который вы хотите перебрать очень быстро, вам нужно быть осторожным при настройке, так как настройки буферизации по умолчанию предназначены для другого варианта использования!