Интересная проблема.Первое, что нужно сделать, это написать какую-то функцию сбора частот.Это может выглядеть так:
public static IObservable<int> GetFrequency<T>(this IObservable<T> source, TimeSpan measuringFreq, TimeSpan lookback)
{
return source.GetFrequency(measuringFreq, lookback, Scheduler.Default);
}
public static IObservable<int> GetFrequency<T>(this IObservable<T> source, TimeSpan measuringFreq, TimeSpan lookback, IScheduler scheduler)
{
return source.Buffer(lookback, measuringFreq, scheduler)
.Select(l => l.Count);
}
Если measuringFreq
равен 1 секунде, а lookback
равен 5 секундам, это означает, что каждую секунду мы будем видеть количество сообщений, доставленных за последние5 секунд.Быстрый и грязный пример:
var r = new System.Random();
var nums = Observable.Generate(
0,
i => i < 100,
i => i + 1,
i => i, _ => TimeSpan.FromSeconds(r.NextDouble() * 1)
);
var freq = nums.GetFrequency(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(5));
freq.Dump(); //Linqpad
nums
- это наблюдаемое, которое должно генерировать сообщение в среднем каждые полсекунды (оно выбирает длительность случайным образом от 0 до 1 секунды).freq
генерирует значение каждую секунду, которое возвращает количество сообщений nums
, созданных за последние 5 секунд (что в среднем должно составлять 10).При последнем запуске на моей машине я получаю следующее:
11
11
12
10
12
11
9
9
10
9
8
...
Когда у нас есть способ получить частоту, вам нужно написать функцию для синтеза двух наблюдаемых типа одного типа, переключаясь на основе частоты,Я написал это:
public static IObservable<T> MaintainFrequencyImproper<T>(this IObservable<T> sourceA, IObservable<T> sourceB, TimeSpan measuringFreq, TimeSpan lookback, IScheduler scheduler, int aAdvantage = 0)
{
var aFreq = sourceA.GetFrequency(measuringFreq, lookback, scheduler);
var bFreq = sourceB.GetFrequency(measuringFreq, lookback, scheduler);
var toReturn = aFreq.Zip(bFreq, (a, b) => a + aAdvantage - b)
.Select(freqDifference => freqDifference < 0 ? sourceB : sourceA) //If advantage is 0, and a & b both popped out 5 messages in the last second, then A wins
.StartWith(sourceA)
.Switch();
return toReturn;
}
Сначала мы получаем частоту с GetFrequency
для обеих наблюдаемых, затем мы объединяем эти два вместе и сравниваем их.Если B чаще, чем A, тогда используйте B. Если они эквивалентны или A чаще, то используйте A.
Переменная aAdvantage
позволяет вам выразить более сильное предпочтение A над B.0 (по умолчанию) означает, что источник A выигрывает ничью, или когда это происходит чаще, но в противном случае выигрывает B.2 означало бы, что B должен будет выдать на 3 сообщения больше, чем A, за последний период, чтобы потребовать использования B.
При наличии Publishing
наблюдаемых, чтобы избежать множественных подписок, это выглядело бы так:
public static IObservable<T> MaintainFrequencyProper<T>(this IObservable<T> sourceA, IObservable<T> sourceB, TimeSpan measuringFreq, TimeSpan lookback,
IScheduler scheduler, int aAdvantage = 0)
{
return sourceA.Publish(_sourceA => sourceB.Publish(_sourceB =>
_sourceA.GetFrequency(measuringFreq, lookback, scheduler)
.Zip(_sourceB.GetFrequency(measuringFreq, lookback, scheduler), (a, b) => a + aAdvantage - b)
.Select(freqDifference => freqDifference < 0 ? _sourceB : _sourceA)
.StartWith(_sourceA)
.Switch()
))
}
Надеюсь, это поможет.Вы не оставили много с точки зрения того, как вписать это в свой код.Если вы хотите, пожалуйста, включите mcve .