Наблюдаемая передача по каналу через `share ()` вызывает одного наблюдателя ненужное количество раз - PullRequest
0 голосов
/ 19 сентября 2018

Я пытаюсь проверить задержку сообщений, отправленных через общую / горячую наблюдаемую.Я заметил, что когда у меня есть несколько наблюдателей в одной общей наблюдаемой, один наблюдатель вызывается n раз из одного сообщения (где n - количество наблюдателей в общей наблюдаемой).

Я запустил приведенный ниже код с 10 наблюдателями и 1 сообщением на наблюдателя , и каждый наблюдатель вызывается 10 раз для каждого сообщения (что подразумевает общее количество наблюдателей 100.next ()звонки).Из моего понимания наблюдателей / наблюдаемых, каждый наблюдатель должен вызываться только один раз за сообщение .Я просто неправильно использую оператор share()?Или мое понимание этого в целом некорректно?

const getMessageLatency = (observersCount, messagesPerObserver) => {
    const completedMessages = [];
    const source = new Subject();
    const sharedObservable = source.pipe(
        tap((message) => console.log(`Subject: Incoming for ${message.id}`)),
        share()
    );

    // Setup observers
    for (i = 0; i < observersCount; ++i) {
        sharedObservable
        .pipe(
            tap((message) => console.log(`SharedObservable: Incoming for ${message.id}`)),
            filter((message) => message.id === getObserverId(i)),
            tap(() => console.log(`Filtered for ${getObserverId(i)}`))
        )
        .subscribe((message) => {
            const date = new Date();
            message.endTime = date.getMilliseconds();
            completedMessages.push(message);
        })
    }

    // send out messages
    for (i = 0; i < observersCount; ++i) {
        for (j = 0; j < messagesPerObserver; ++j) {
            const date = new Date();
            const message = {
                id: getObserverId(i),
                startTime: date.getMilliseconds()
            }

            // send message
            source.next(message);
        }
    }

    // process data (get average message latency)
    const totalMessageLatency = completedMessages.reduce(
        (accumulatedLatency, currentMessage) => {
            const currentMessageLatency = 
                currentMessage.endTime - currentMessage.startTime;
            return accumulatedLatency + currentMessageLatency;
        }, 0);
    const averageLatency = totalMessageLatency / completedMessages.length;

    console.log("==============================================================================");
    console.log(`Observers: ${observersCount}, MessagesPerObserver: ${messagesPerObserver}`);
    console.log(`Total Messages Sent: ${observersCount * messagesPerObserver}`);
    console.log(`Total Messages Received: ${completedMessages.length}`);
    console.log(`Average Latency per Message: ${averageLatency}`);
    console.log("==============================================================================");

    return averageLatency;
}

После завершения работы, если "Всего отправленных сообщений" равно x , тогда "Всего полученных сообщений" будет х ^ 2

1 Ответ

0 голосов
/ 19 сентября 2018

Добавлено let в объявления моих циклов for.Вы можете сказать, что я новичок в JavaScript тоже.

Спасибо, cartant

...