Я реализовал, вероятно, очень наивный механизм очередей разветвления в памяти, например:
public class ObservableQueue<T> : IObservableQueue<T>
{
private readonly Subject<T> queue;
public IObservable<T> Messages => queue.AsObservable();
public ObservableQueue()
{
queue = new Subject<T>();
}
public void Enqueue(T item)
{
queue.OnNext(item);
}
public void Enqueue(List<T> items)
{
items.ForEach(queue.OnNext);
}
}
Причина, по которой я выбрал эту реализацию, заключается в том, что она допускает чрезвычайно выразительные подписки, которые яЯ большой поклонник, вот так:
subscription = queue.Messages
.Select(data => data.ToJson())
.Buffer(TimeSpan.FromSeconds(10), ByteSize.FromBytes(128))
.Where(Enumerable.Any)
.Select(ToQuery)
.Subscribe(query => db.Execute(query));
Используемый здесь метод Buffer
, опять же, несколько наивная реализация мной:
public static IObservable<IList<string>> Buffer(this IObservable<string> source, TimeSpan timeSpan, ByteSize size)
{
// Completes when the `timespan` has elapsed
var timer = Observable.Timer(timeSpan).Select(_ => new Unit());
// Completes when the ByteSize exceeds `size`
var bytes = source
.Scan(ByteSize.FromBytes(0),
(a, b) => a + ByteSize.FromBytes(Encoding.Unicode.GetByteCount(b)))
.SkipWhile(a => a < size)
.Select(_ => new Unit()); // We only want to use these for notification, and both observables
// need to be of the same type, so we just emit Unit
// Amb races the two observables to see which one finishes first, which then propagates the notification
// and signals the source to strop buffering
return source.Buffer(() => Observable.Amb(timer, bytes));
}
Теперь, каккак только я создаю подписку во втором блоке кода, происходит абсолютное увеличение использования памяти, виновником которого является Subject<T>
в ObservableQueue<T>
.Я должен подчеркнуть, что на тот момент не фактические данные были Enqueue
d.subscription
был создан и все еще ожидает каких-либо данных для фактической работы.
Здесь я вижу некоторых потенциальных виновников:
queue.AsObservable()
- Пользовательский
Buffer
метод, который я написал - Тот факт, что
subscription
является долгоживущим
Тем не менее, я не смог точно определить истинную причину здесь,Любые идеи?
Примечание: я думаю, это довольно очевидно, я не очень хорошо знаком с System.Reactive
, поэтому я прошу прощения, если я написал что-нибудь глупое здесь.