Я пытаюсь проверить задержку сообщений, отправленных через общую / горячую наблюдаемую.Я заметил, что когда у меня есть несколько наблюдателей в одной общей наблюдаемой, один наблюдатель вызывается n раз из одного сообщения (где n - количество наблюдателей в общей наблюдаемой).
Я запустил приведенный ниже код с 10 наблюдателями и 1 сообщением на наблюдателя , и каждый наблюдатель вызывается 10 раз для каждого сообщения (что подразумевает общее количество наблюдателей 100.next ()звонки).Из моего понимания наблюдателей / наблюдаемых, каждый наблюдатель должен вызываться только один раз за сообщение .Я просто неправильно использую оператор share()
?Или мое понимание этого в целом некорректно?
const getMessageLatency = (observersCount, messagesPerObserver) => {
const completedMessages = [];
const source = new Subject();
const sharedObservable = source.pipe(
tap((message) => console.log(`Subject: Incoming for ${message.id}`)),
share()
);
// Setup observers
for (i = 0; i < observersCount; ++i) {
sharedObservable
.pipe(
tap((message) => console.log(`SharedObservable: Incoming for ${message.id}`)),
filter((message) => message.id === getObserverId(i)),
tap(() => console.log(`Filtered for ${getObserverId(i)}`))
)
.subscribe((message) => {
const date = new Date();
message.endTime = date.getMilliseconds();
completedMessages.push(message);
})
}
// send out messages
for (i = 0; i < observersCount; ++i) {
for (j = 0; j < messagesPerObserver; ++j) {
const date = new Date();
const message = {
id: getObserverId(i),
startTime: date.getMilliseconds()
}
// send message
source.next(message);
}
}
// process data (get average message latency)
const totalMessageLatency = completedMessages.reduce(
(accumulatedLatency, currentMessage) => {
const currentMessageLatency =
currentMessage.endTime - currentMessage.startTime;
return accumulatedLatency + currentMessageLatency;
}, 0);
const averageLatency = totalMessageLatency / completedMessages.length;
console.log("==============================================================================");
console.log(`Observers: ${observersCount}, MessagesPerObserver: ${messagesPerObserver}`);
console.log(`Total Messages Sent: ${observersCount * messagesPerObserver}`);
console.log(`Total Messages Received: ${completedMessages.length}`);
console.log(`Average Latency per Message: ${averageLatency}`);
console.log("==============================================================================");
return averageLatency;
}
После завершения работы, если "Всего отправленных сообщений" равно x , тогда "Всего полученных сообщений" будет х ^ 2