Я пытаюсь написать прокси-класс для IMongoCollection
, чтобы я мог использовать кэш в памяти для некоторых реализаций метода.Проблема, однако, заключается в том, что почти все фильтры имеют тип FilterDefinition<T>
, что означает, что мы можем вызвать Render
для них, чтобы получить BsonDocument
.Мне интересно, есть ли способ преобразовать фильтр BsonDocument
в динамическое выражение, чтобы я мог запустить его в моей памяти List<T>
.Или, может быть, есть лучший подход для кэширования в памяти, о котором я не знаю.Спасибо.
Обновление:
Мне хотелось написать решение, как предложил @ simon-mourier, но проблема с этим хакерским решением заключается в том, что драйвер C # mongo возвращает IAsyncCursor<T>
для операций поиска, которыев основном это поток BsonDocument
с, и после каждого чтения он указывает на последний индекс и удаляет себя.И нет способа сбросить поток в исходное положение.Это означает, что следующий код работает в первый раз, но после этого мы получаем исключение, когда курсор находится в конце потока и уже расположен.
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using DAL.Extensions;
using MongoDB.Bson;
using MongoDB.Bson.Serialization;
using MongoDB.Driver;
namespace DAL.Proxies
{
public static class MongoCollectionProxy
{
private static readonly Dictionary<Type, object> _instances = new Dictionary<Type, object>();
public static IMongoCollection<T> New<T>(IMongoCollection<T> proxy)
{
return ((IMongoCollection<T>)_instances.AddOrUpdate(typeof(T), () => new MongoCollectionBaseProxyImpl<T>(proxy)));
}
}
public class MongoCollectionBaseProxyImpl<T> : MongoCollectionBase<T>
{
private readonly IMongoCollection<T> _proxy;
private readonly ConcurrentDictionary<string, object> _cache = new ConcurrentDictionary<string, object>();
public MongoCollectionBaseProxyImpl(IMongoCollection<T> proxy)
{
_proxy = proxy;
}
public override Task<IAsyncCursor<TResult>> AggregateAsync<TResult>(PipelineDefinition<T, TResult> pipeline,
AggregateOptions options = null,
CancellationToken cancellationToken = new CancellationToken())
{
return _proxy.AggregateAsync(pipeline, options, cancellationToken);
}
public override Task<BulkWriteResult<T>> BulkWriteAsync(IEnumerable<WriteModel<T>> requests,
BulkWriteOptions options = null,
CancellationToken cancellationToken = new CancellationToken())
{
return _proxy.BulkWriteAsync(requests, options, cancellationToken);
}
[Obsolete("Use CountDocumentsAsync or EstimatedDocumentCountAsync instead.")]
public override Task<long> CountAsync(FilterDefinition<T> filter, CountOptions options = null,
CancellationToken cancellationToken = new CancellationToken())
{
return _proxy.CountAsync(filter, options, cancellationToken);
}
public override Task<IAsyncCursor<TField>> DistinctAsync<TField>(FieldDefinition<T, TField> field,
FilterDefinition<T> filter, DistinctOptions options = null,
CancellationToken cancellationToken = new CancellationToken())
{
return _proxy.DistinctAsync(field, filter, options, cancellationToken);
}
public override async Task<IAsyncCursor<TProjection>> FindAsync<TProjection>(FilterDefinition<T> filter,
FindOptions<T, TProjection> options = null,
CancellationToken cancellationToken = new CancellationToken())
{
// ReSharper disable once SpecifyACultureInStringConversionExplicitly
return await CacheResult(filter.Render().ToString(), () => _proxy.FindAsync(filter, options, cancellationToken));
}
public override async Task<TProjection> FindOneAndDeleteAsync<TProjection>(FilterDefinition<T> filter,
FindOneAndDeleteOptions<T, TProjection> options = null,
CancellationToken cancellationToken = new CancellationToken())
{
return await InvalidateCache(_proxy.FindOneAndDeleteAsync(filter, options, cancellationToken));
}
public override async Task<TProjection> FindOneAndReplaceAsync<TProjection>(FilterDefinition<T> filter,
T replacement,
FindOneAndReplaceOptions<T, TProjection> options = null,
CancellationToken cancellationToken = new CancellationToken())
{
return await InvalidateCache(_proxy.FindOneAndReplaceAsync(filter, replacement, options,
cancellationToken));
}
public override async Task<TProjection> FindOneAndUpdateAsync<TProjection>(FilterDefinition<T> filter,
UpdateDefinition<T> update,
FindOneAndUpdateOptions<T, TProjection> options = null,
CancellationToken cancellationToken = new CancellationToken())
{
return await InvalidateCache(_proxy.FindOneAndUpdateAsync(filter, update, options, cancellationToken));
}
public override Task<IAsyncCursor<TResult>> MapReduceAsync<TResult>(BsonJavaScript map, BsonJavaScript reduce,
MapReduceOptions<T, TResult> options = null,
CancellationToken cancellationToken = new CancellationToken())
{
return _proxy.MapReduceAsync(map, reduce, options, cancellationToken);
}
public override IFilteredMongoCollection<TDerivedDocument> OfType<TDerivedDocument>()
{
return _proxy.OfType<TDerivedDocument>();
}
public override IMongoCollection<T> WithReadPreference(ReadPreference readPreference)
{
return _proxy.WithReadPreference(readPreference);
}
public override IMongoCollection<T> WithWriteConcern(WriteConcern writeConcern)
{
return _proxy.WithWriteConcern(writeConcern);
}
public override CollectionNamespace CollectionNamespace => _proxy.CollectionNamespace;
public override IMongoDatabase Database => _proxy.Database;
public override IBsonSerializer<T> DocumentSerializer => _proxy.DocumentSerializer;
public override IMongoIndexManager<T> Indexes => _proxy.Indexes;
public override MongoCollectionSettings Settings => _proxy.Settings;
private async Task<TResult> CacheResult<TResult>(string key, Func<Task<TResult>> result)
{
return _cache.ContainsKey(key) ? (TResult) _cache[key] : (TResult) _cache.AddOrUpdate(key, await result());
}
private TResult InvalidateCache<TResult>(TResult result)
{
_cache.Clear();
return result;
}
}
}