MongoDB как блокировка с помощью WriteConcern Majority и ReadConcern Linearizable - PullRequest
0 голосов
/ 07 января 2019

Я получил одно место в своем приложении, где я хочу использовать Mongo (3.6) в качестве блокировки нескольких потоков (на разных серверах). По сути, что-то вроде «если один поток начал работать, другие потоки должны видеть его через Монго и не начинать ту же работу параллельно».

Из документации я узнал

В сочетании с «записью большинства» принцип «линеаризуемого чтения» позволяет нескольким потокам выполнять чтение и запись в одном документе, как если бы один поток выполнял эти операции в реальном времени;

Так что это звучит хорошо для меня, я вставляю определенный документ, если один поток начинает работать, и другие потоки проверяют, существует ли такой документ, и не запускают, если так, но это не работает для моего случая.

Я подготовил два теста - один непараллельный, который успешно блокирует второй поток - но параллельный тест не пройден, и я получил два из этих RebuildLog документов.

using System;
using System.Threading.Tasks;
using FluentAssertions;
using Xunit;
using MongoDB.Bson.Serialization.Attributes;
using MongoDB.Driver;

namespace FindOneAndUpdateTests
{
    public class FindOneAndUpdateTests
    {
        private static IMongoDatabase GetDatabase()
        {
            var dbName = "test";
            var client = new MongoClient("mongodb://localhost:45022");

            return client.GetDatabase(dbName);
        }

        private IMongoCollection<RebuildLog> GetCollection()
        {
            return GetDatabase().GetCollection<RebuildLog>("RebuildLog");
        }

        [Fact]
        public async Task FindOneAndUpdate_NotParallel_Test()
        {
            var dlpId = Guid.NewGuid();

            var first = await FindOneAndUpdateMethod(dlpId);
            var second = await FindOneAndUpdateMethod(dlpId);

            first.Should().BeFalse();
            second.Should().BeTrue();
        }

        [Fact]
        public async Task FindOneAndUpdate_Parallel_Test()
        {
            var dlpId = Guid.NewGuid();

            var taskFirst = FindOneAndUpdateMethod(dlpId);
            var taskSecond = FindOneAndUpdateMethod(dlpId);

            var first = await taskFirst;
            var second = await taskSecond;

            first.Should().BeFalse();
            second.Should().BeTrue();
        }

        private async Task<bool> FindOneAndUpdateMethod(Guid dlpId)
        {
            var mongoCollection = GetCollection();

            var filterBuilder = Builders<RebuildLog>.Filter;
            var filter = filterBuilder.Where(w => w.DlpId == dlpId);

            var creator = Builders<RebuildLog>.Update
                .SetOnInsert(w => w.DlpId, dlpId)
                .SetOnInsert(w => w.ChangeDate, DateTime.UtcNow)
                .SetOnInsert(w => w.BuildDate, DateTime.UtcNow)
                .SetOnInsert(w => w.Id, Guid.NewGuid());

            var options = new FindOneAndUpdateOptions<RebuildLog>
            {
                IsUpsert = true,
                ReturnDocument = ReturnDocument.Before
            };

            var result = await mongoCollection
                .WithWriteConcern(WriteConcern.WMajority)
                .WithReadConcern(ReadConcern.Linearizable)
                .FindOneAndUpdateAsync(filter, creator, options);

            return result != null;
        }
    }

    [BsonIgnoreExtraElements]
    public class RebuildLog
    {
        public RebuildLog()
        {
            Id = Guid.NewGuid();
        }

        public Guid Id { get; set; }
        public DateTime ChangeDate { get; set; }
        public string ChangeUser { get; set; }
        public Guid DlpId { get; set; }
        public string Portal { get; set; }
        public DateTime? BuildDate { get; set; }
    }
}

Я подозреваю, что моя идея с атомарным созданным вручную GetOrInsert (см. FindOneAndUpdate с IsUpsert) нарушает ограничение «на один документ» в документации. Любая идея, чтобы это исправить или это просто невозможно?

1 Ответ

0 голосов
/ 15 мая 2019

Это интересно. Может быть, у вас нет уникального индекса по DlpId? Вот почему Монго решает, что последовательное выполнение этих операций не является необходимым, потому что в вашем случае это не шаблон записи-затем-чтения (как указано в «Клиентских сеансах и гарантиях причинно-следственной согласованности»). Это обновить или создать два раза одновременно. Как насчет этого? :

public class SyncDocument 
{
    // ...
    [BsonElement("locked"), BsonDefaultValue(false)]
    public bool Locked { get; set; }
}

В коде клиента:

var filter = Builders<SyncDocument>.Filter.Eq(d => d.Locked, false);
var update = Builders<SyncDocument>.Update.Set(d => d.Locked, true);
var result = collection.UpdateOne(filter, update);
if (result.ModifiedCount == 1) {
    Console.WriteLine("Lock acquired");
}

Документ с полем Locked должен быть создан до запуска приложений (если он применим для вашей задачи).

...