System.reactive - динамически переключаться по частоте - PullRequest
1 голос
/ 04 июля 2019

У меня есть два источника данных.

Давайте представим, что:

  • Система A обеспечивает более качественные данные с более высокой частотой, например,
    1прайс / 1сек, но иногда возникают сбои и отсутствуют данные или
    частота, например, 1 цена / 20 с
  • Система B предоставляет данные с более низкой частотой, например, 1price / 10сек

Существует ли какой-либо изящный способ использования system.reactive для обычного извлечения данных из системы A, но при сбое (без данных в фиде) или замедлении использования данных из системы B? Я хочу реализовать какой-то переключатель, который будет использовать источник A, когда он быстрее, чем B. Я не хочу смешивать источники, поэтому я могу использовать только SystemA или SystemB одновременно.


    class PriceFeed {

        public IObservable<Price> GetPricesFeed(IObservable<PriceFromA> pricesFromA, IObservable<PriceFromB> pricesFromB)
        {


        }


        private Price Convert(PriceFromA price) { //convert }

        private Price Convert(PriceFromB price) { //convert }

    }

1 Ответ

4 голосов
/ 05 июля 2019

Интересная проблема.Первое, что нужно сделать, это написать какую-то функцию сбора частот.Это может выглядеть так:

public static IObservable<int> GetFrequency<T>(this IObservable<T> source, TimeSpan measuringFreq, TimeSpan lookback)
{
    return source.GetFrequency(measuringFreq, lookback, Scheduler.Default);
}

public static IObservable<int> GetFrequency<T>(this IObservable<T> source, TimeSpan measuringFreq, TimeSpan lookback, IScheduler scheduler)
{
    return source.Buffer(lookback, measuringFreq, scheduler)
        .Select(l => l.Count);
}

Если measuringFreq равен 1 секунде, а lookback равен 5 секундам, это означает, что каждую секунду мы будем видеть количество сообщений, доставленных за последние5 секунд.Быстрый и грязный пример:

var r = new System.Random();
var nums = Observable.Generate(
    0, 
    i => i < 100, 
    i => i + 1, 
    i => i, _ => TimeSpan.FromSeconds(r.NextDouble() * 1)
);
var freq = nums.GetFrequency(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(5));
freq.Dump(); //Linqpad

nums - это наблюдаемое, которое должно генерировать сообщение в среднем каждые полсекунды (оно выбирает длительность случайным образом от 0 до 1 секунды).freq генерирует значение каждую секунду, которое возвращает количество сообщений nums, созданных за последние 5 секунд (что в среднем должно составлять 10).При последнем запуске на моей машине я получаю следующее:

11
11
12
10
12
11
9
9
10
9
8
...

Когда у нас есть способ получить частоту, вам нужно написать функцию для синтеза двух наблюдаемых типа одного типа, переключаясь на основе частоты,Я написал это:

public static IObservable<T> MaintainFrequencyImproper<T>(this IObservable<T> sourceA, IObservable<T> sourceB, TimeSpan measuringFreq, TimeSpan lookback, IScheduler scheduler, int aAdvantage = 0)
{
    var aFreq = sourceA.GetFrequency(measuringFreq, lookback, scheduler);
    var bFreq = sourceB.GetFrequency(measuringFreq, lookback, scheduler);

    var toReturn = aFreq.Zip(bFreq, (a, b) => a + aAdvantage - b)
        .Select(freqDifference => freqDifference < 0 ? sourceB : sourceA)   //If advantage is 0, and a & b both popped out 5 messages in the last second, then A wins
        .StartWith(sourceA)
        .Switch();

    return toReturn;
}

Сначала мы получаем частоту с GetFrequency для обеих наблюдаемых, затем мы объединяем эти два вместе и сравниваем их.Если B чаще, чем A, тогда используйте B. Если они эквивалентны или A чаще, то используйте A.

Переменная aAdvantage позволяет вам выразить более сильное предпочтение A над B.0 (по умолчанию) означает, что источник A выигрывает ничью, или когда это происходит чаще, но в противном случае выигрывает B.2 означало бы, что B должен будет выдать на 3 сообщения больше, чем A, за последний период, чтобы потребовать использования B.

При наличии Publishing наблюдаемых, чтобы избежать множественных подписок, это выглядело бы так:

public static IObservable<T> MaintainFrequencyProper<T>(this IObservable<T> sourceA, IObservable<T> sourceB, TimeSpan measuringFreq, TimeSpan lookback, 
    IScheduler scheduler, int aAdvantage = 0)
{
    return sourceA.Publish(_sourceA => sourceB.Publish(_sourceB => 
        _sourceA.GetFrequency(measuringFreq, lookback, scheduler)
            .Zip(_sourceB.GetFrequency(measuringFreq, lookback, scheduler), (a, b) => a + aAdvantage - b)
            .Select(freqDifference => freqDifference < 0 ? _sourceB : _sourceA)
            .StartWith(_sourceA)
            .Switch()

    ))
}

Надеюсь, это поможет.Вы не оставили много с точки зрения того, как вписать это в свой код.Если вы хотите, пожалуйста, включите mcve .

...