Одновременно программно увеличиваются числа, связанные с данным идентификатором в порядке в таблице PostgreSql, возможно? - PullRequest
2 голосов
/ 22 апреля 2020

У меня слишком упрощенное хранилище событий, описанное ниже как таблица PostgreSQL:

CREATE TABLE IF NOT EXISTS public.my_events(
    id                BIGSERIAL PRIMARY KEY,
    stream_id         TEXT NOT NULL,
    stream_version    INT NOT NULL,
    CONSTRAINT my_events_stream_id_and_version UNIQUE(stream_id, stream_version));
CREATE INDEX my_events_idx_stream_id ON public.my_events (stream_id);
CREATE INDEX my_events_idx_stream_version

Я увеличиваю stream_version всякий раз, когда вставляю новую запись с:

INSERT INTO public.my_events(stream_id, stream_version)
VALUES(@stream_id, @stream_version)

В таблице может быть несколько значений stream_id, поэтому значения stream_version (всегда начинающиеся с цифры 1) не обязательно должны быть смежными до тех пор, пока они в порядке:

---------------------------------------
|  id  |  stream_id  | stream_version |
---------------------------------------
| xxxx |      A      |       1        |
| xxxx |      A      |       2        |
| xxxx |      A      |       3        |
| xxxx |      A      |       4        |
| xxxx |      A      |       5        |
| xxxx |      B      |       1        |
| xxxx |      B      |       2        |
| xxxx |      A      |       6        |
| xxxx |      B      |       3        |
| xxxx |      B      |       4        |
| xxxx |      B      |       5        |
---------------------------------------

Просто чтобы прояснить этот супер-пупер, выше есть два потока с соответствующими версиями:

id = A: 1, 2, 3, 4, 5, 6
id = B: 1, 2, 3, 4, 5

Вот как логика c реализована в C#

public class MyEvent
{
    public long Id { get; set; }
    public string StreamId { get; set; }
    public int StreamVersion { get; set; }

    public override string ToString()
    {
        return $"{nameof(Id)}: {Id}, {nameof(StreamId)}: {StreamId}, {nameof(StreamVersion)}: {StreamVersion}";
    }
}

public class MyEventStore
{
    private readonly string _connectionString;
    private const int FirstStreamVersion = 1;

    public MyEventStore(string connectionString) => 
        _connectionString = connectionString;

    private NpgsqlConnection CreateConnection() => 
        new NpgsqlConnection(_connectionString);

    public async Task DropTableAsync()
    {
        const string statement = 
            @"DROP TABLE IF EXISTS public.my_events";

        await using var connection = CreateConnection();
        await connection.ExecuteAsync(statement);
    }

    public async Task CreateTableAsync()
    {
        const string statement = 
            @"CREATE TABLE IF NOT EXISTS public.my_events(
                id                BIGSERIAL PRIMARY KEY,
                stream_id         TEXT NOT NULL,
                stream_version    INT NOT NULL,
                CONSTRAINT my_events_stream_id_and_version UNIQUE(stream_id, stream_version));
                CREATE INDEX my_events_idx_stream_id ON public.my_events (stream_id);
                CREATE INDEX my_events_idx_stream_version ON public.my_events (stream_version);";

        await using var connection = CreateConnection();
        await connection.ExecuteAsync(statement);
    }

    private async Task<int?> SelectLastStreamVersionAsync(NpgsqlConnection connection, string streamId)
    {
        const string statement =
            @"SELECT stream_version
              FROM public.my_events
              WHERE stream_id = @stream_id
              ORDER BY stream_version DESC
              LIMIT 1";

        var parameters = new DynamicParameters();
        parameters.Add("stream_id", streamId);

        var result = await connection.QuerySingleOrDefaultAsync<int>(statement, parameters);
        return result < 1 
            ? new int?() 
            : result;
    }

    private async Task InsertEventsAsync(NpgsqlConnection connection, string streamId, int? startVersion, int eventCount)
    {
        const string insertIntoStatement =
            @"INSERT INTO public.my_events(stream_id, stream_version)
              VALUES(@stream_id, @stream_version)";

        var streamVersion = startVersion ?? 0;

        for (var i = 0; i < eventCount; i++)
        {
            streamVersion++;

            var parameters = new DynamicParameters();
            parameters.Add("stream_id", streamId);
            parameters.Add("stream_version", streamVersion);
            await connection.ExecuteAsync(insertIntoStatement, parameters);
        }
    }

    public async Task AppendEventsAsync(string streamId, int eventCount)
    {
        var transactionOptions = new TransactionOptions
        {
            IsolationLevel = IsolationLevel.ReadCommitted
        };
        using var transactionScope = new TransactionScope(
            TransactionScopeOption.Required, 
            transactionOptions,
            TransactionScopeAsyncFlowOption.Enabled);
        await using var connection = CreateConnection();
        var streamVersion = await SelectLastStreamVersionAsync(connection, streamId);
        await InsertEventsAsync(connection, streamId, streamVersion, eventCount);
        transactionScope.Complete();
    }

    public async IAsyncEnumerable<MyEvent> FetchStreamEventsAsync(string streamId)
    {
        await using var connection = CreateConnection();
        const string statement =
            @"SELECT ctid, * FROM public.my_events
              WHERE stream_id = @stream_id";
        var parameters = new DynamicParameters();
        parameters.Add("stream_id", streamId);
        await using var reader = await connection.ExecuteReaderAsync(statement, parameters);
        while (await reader.ReadAsync())
        {
            yield return new MyEvent
            {
                Id = reader.GetFieldValue<long>("id"),
                StreamId = reader.GetFieldValue<string>("stream_id"),
                StreamVersion = reader.GetFieldValue<int>("stream_version"),
            };
        }
    }
}

public static class Program
{
    public static async Task Main()
    {
        var connectionStringBuilder = new NpgsqlConnectionStringBuilder
        {
            Host = "localhost",
            Port = 5432,
            Username = "root", 
            Password = "root", 
            Database = "postgres",
            Pooling = true
        };
        var connectionString = connectionStringBuilder.ToString();
        var eventStore = new MyEventStore(connectionString);

        await eventStore.DropTableAsync();
        await eventStore.CreateTableAsync();

        const string streamId = "MyEvent";

        var tasks =
            Enumerable
                .Range(1, 25)
                .Select(x =>
                {
                    var policy = Policy
                        .Handle<PostgresException>(e => e.SqlState == "23505")
                        .RetryForeverAsync();

                    return policy.ExecuteAsync(async () => await eventStore.AppendEventsAsync(streamId, 2));
                });

        await Task.WhenAll(tasks);

        var streamEvents = await eventStore.FetchStreamEventsAsync(streamId).ToListAsync();
        Console.WriteLine(string.Join(Environment.NewLine, streamEvents));
    }
}
Id: 1, StreamId: MyEvent, StreamVersion: 1
Id: 2, StreamId: MyEvent, StreamVersion: 2
Id: 3, StreamId: MyEvent, StreamVersion: 3
Id: 11, StreamId: MyEvent, StreamVersion: 4
Id: 15, StreamId: MyEvent, StreamVersion: 5
Id: 22, StreamId: MyEvent, StreamVersion: 6
Id: 37, StreamId: MyEvent, StreamVersion: 7
Id: 48, StreamId: MyEvent, StreamVersion: 8
Id: 58, StreamId: MyEvent, StreamVersion: 9
Id: 68, StreamId: MyEvent, StreamVersion: 10
Id: 77, StreamId: MyEvent, StreamVersion: 11
Id: 78, StreamId: MyEvent, StreamVersion: 12
Id: 95, StreamId: MyEvent, StreamVersion: 13
Id: 96, StreamId: MyEvent, StreamVersion: 14
Id: 97, StreamId: MyEvent, StreamVersion: 15
Id: 106, StreamId: MyEvent, StreamVersion: 16
Id: 116, StreamId: MyEvent, StreamVersion: 17
Id: 117, StreamId: MyEvent, StreamVersion: 18
Id: 125, StreamId: MyEvent, StreamVersion: 19
Id: 129, StreamId: MyEvent, StreamVersion: 20
Id: 133, StreamId: MyEvent, StreamVersion: 21
Id: 143, StreamId: MyEvent, StreamVersion: 22
Id: 145, StreamId: MyEvent, StreamVersion: 23
Id: 147, StreamId: MyEvent, StreamVersion: 24
Id: 152, StreamId: MyEvent, StreamVersion: 25
Id: 153, StreamId: MyEvent, StreamVersion: 26
Id: 157, StreamId: MyEvent, StreamVersion: 27
Id: 161, StreamId: MyEvent, StreamVersion: 28
Id: 177, StreamId: MyEvent, StreamVersion: 35
Id: 179, StreamId: MyEvent, StreamVersion: 36
Id: 181, StreamId: MyEvent, StreamVersion: 37
Id: 184, StreamId: MyEvent, StreamVersion: 38
Id: 190, StreamId: MyEvent, StreamVersion: 39
Id: 192, StreamId: MyEvent, StreamVersion: 40
Id: 202, StreamId: MyEvent, StreamVersion: 43
Id: 204, StreamId: MyEvent, StreamVersion: 44
Id: 162, StreamId: MyEvent, StreamVersion: 29
Id: 164, StreamId: MyEvent, StreamVersion: 30
Id: 166, StreamId: MyEvent, StreamVersion: 31
Id: 167, StreamId: MyEvent, StreamVersion: 32
Id: 170, StreamId: MyEvent, StreamVersion: 33
Id: 175, StreamId: MyEvent, StreamVersion: 34
Id: 196, StreamId: MyEvent, StreamVersion: 41
Id: 198, StreamId: MyEvent, StreamVersion: 42
Id: 211, StreamId: MyEvent, StreamVersion: 47
Id: 213, StreamId: MyEvent, StreamVersion: 48
Id: 214, StreamId: MyEvent, StreamVersion: 49
Id: 215, StreamId: MyEvent, StreamVersion: 50
Id: 207, StreamId: MyEvent, StreamVersion: 45
Id: 209, StreamId: MyEvent, StreamVersion: 46

Как показано выше в одном потоке, у меня есть некоторые проблемы с параллельными транзакциями, я хотел бы знать, как я могу предотвратить это, чтобы версии были вставлены в правильном порядке для данного потока .

Например, в приведенном выше примере у нас есть не правильно упорядоченные версии:

Id: 202, StreamId: MyEvent, StreamVersion: 43
Id: 204, StreamId: MyEvent, StreamVersion: 44
Id: 162, StreamId: MyEvent, StreamVersion: 29
Id: 164, StreamId: MyEvent, StreamVersion: 30
Id: 215, StreamId: MyEvent, StreamVersion: 50
Id: 207, StreamId: MyEvent, StreamVersion: 45
Id: 209, StreamId: MyEvent, StreamVersion: 46
...