Огромная утечка памяти с долгоживущей подпиской на Rx Subject - PullRequest
0 голосов
/ 04 июля 2019

Я реализовал, вероятно, очень наивный механизм очередей разветвления в памяти, например:

public class ObservableQueue<T> : IObservableQueue<T>
{
    private readonly Subject<T> queue;

    public IObservable<T> Messages => queue.AsObservable();

    public ObservableQueue()
    {
        queue = new Subject<T>();
    }

    public void Enqueue(T item)
    {
        queue.OnNext(item);
    }

    public void Enqueue(List<T> items)
    {
        items.ForEach(queue.OnNext);
    }
}

Причина, по которой я выбрал эту реализацию, заключается в том, что она допускает чрезвычайно выразительные подписки, которые яЯ большой поклонник, вот так:

subscription = queue.Messages
    .Select(data => data.ToJson())
    .Buffer(TimeSpan.FromSeconds(10), ByteSize.FromBytes(128))
    .Where(Enumerable.Any)
    .Select(ToQuery)
    .Subscribe(query => db.Execute(query));

Используемый здесь метод Buffer, опять же, несколько наивная реализация мной:

public static IObservable<IList<string>> Buffer(this IObservable<string> source, TimeSpan timeSpan, ByteSize size) 
{
    // Completes when the `timespan` has elapsed
    var timer = Observable.Timer(timeSpan).Select(_ => new Unit());
    // Completes when the ByteSize exceeds `size`
    var bytes = source
        .Scan(ByteSize.FromBytes(0),
                    (a, b) => a + ByteSize.FromBytes(Encoding.Unicode.GetByteCount(b)))
        .SkipWhile(a => a < size)
        .Select(_ => new Unit()); // We only want to use these for notification, and both observables
                                  // need to be of the same type, so we just emit Unit

    // Amb races the two observables to see which one finishes first, which then propagates the notification
    // and signals the source to strop buffering
    return source.Buffer(() => Observable.Amb(timer, bytes));
}

Теперь, каккак только я создаю подписку во втором блоке кода, происходит абсолютное увеличение использования памяти, виновником которого является Subject<T> в ObservableQueue<T>.Я должен подчеркнуть, что на тот момент не фактические данные были Enqueue d.subscription был создан и все еще ожидает каких-либо данных для фактической работы.

Здесь я вижу некоторых потенциальных виновников:

  • queue.AsObservable()
  • Пользовательский Buffer метод, который я написал
  • Тот факт, что subscription является долгоживущим

Тем не менее, я не смог точно определить истинную причину здесь,Любые идеи?

Примечание: я думаю, это довольно очевидно, я не очень хорошо знаком с System.Reactive, поэтому я прошу прощения, если я написал что-нибудь глупое здесь.

...