У меня слишком упрощенное хранилище событий, описанное ниже как таблица PostgreSQL:
CREATE TABLE IF NOT EXISTS public.my_events(
id BIGSERIAL PRIMARY KEY,
stream_id TEXT NOT NULL,
stream_version INT NOT NULL,
CONSTRAINT my_events_stream_id_and_version UNIQUE(stream_id, stream_version));
CREATE INDEX my_events_idx_stream_id ON public.my_events (stream_id);
CREATE INDEX my_events_idx_stream_version
Я увеличиваю stream_version
всякий раз, когда вставляю новую запись с:
INSERT INTO public.my_events(stream_id, stream_version)
VALUES(@stream_id, @stream_version)
В таблице может быть несколько значений stream_id
, поэтому значения stream_version
(всегда начинающиеся с цифры 1) не обязательно должны быть смежными до тех пор, пока они в порядке:
---------------------------------------
| id | stream_id | stream_version |
---------------------------------------
| xxxx | A | 1 |
| xxxx | A | 2 |
| xxxx | A | 3 |
| xxxx | A | 4 |
| xxxx | A | 5 |
| xxxx | B | 1 |
| xxxx | B | 2 |
| xxxx | A | 6 |
| xxxx | B | 3 |
| xxxx | B | 4 |
| xxxx | B | 5 |
---------------------------------------
Просто чтобы прояснить этот супер-пупер, выше есть два потока с соответствующими версиями:
id = A: 1, 2, 3, 4, 5, 6
id = B: 1, 2, 3, 4, 5
Вот как логика c реализована в C#
public class MyEvent
{
public long Id { get; set; }
public string StreamId { get; set; }
public int StreamVersion { get; set; }
public override string ToString()
{
return $"{nameof(Id)}: {Id}, {nameof(StreamId)}: {StreamId}, {nameof(StreamVersion)}: {StreamVersion}";
}
}
public class MyEventStore
{
private readonly string _connectionString;
private const int FirstStreamVersion = 1;
public MyEventStore(string connectionString) =>
_connectionString = connectionString;
private NpgsqlConnection CreateConnection() =>
new NpgsqlConnection(_connectionString);
public async Task DropTableAsync()
{
const string statement =
@"DROP TABLE IF EXISTS public.my_events";
await using var connection = CreateConnection();
await connection.ExecuteAsync(statement);
}
public async Task CreateTableAsync()
{
const string statement =
@"CREATE TABLE IF NOT EXISTS public.my_events(
id BIGSERIAL PRIMARY KEY,
stream_id TEXT NOT NULL,
stream_version INT NOT NULL,
CONSTRAINT my_events_stream_id_and_version UNIQUE(stream_id, stream_version));
CREATE INDEX my_events_idx_stream_id ON public.my_events (stream_id);
CREATE INDEX my_events_idx_stream_version ON public.my_events (stream_version);";
await using var connection = CreateConnection();
await connection.ExecuteAsync(statement);
}
private async Task<int?> SelectLastStreamVersionAsync(NpgsqlConnection connection, string streamId)
{
const string statement =
@"SELECT stream_version
FROM public.my_events
WHERE stream_id = @stream_id
ORDER BY stream_version DESC
LIMIT 1";
var parameters = new DynamicParameters();
parameters.Add("stream_id", streamId);
var result = await connection.QuerySingleOrDefaultAsync<int>(statement, parameters);
return result < 1
? new int?()
: result;
}
private async Task InsertEventsAsync(NpgsqlConnection connection, string streamId, int? startVersion, int eventCount)
{
const string insertIntoStatement =
@"INSERT INTO public.my_events(stream_id, stream_version)
VALUES(@stream_id, @stream_version)";
var streamVersion = startVersion ?? 0;
for (var i = 0; i < eventCount; i++)
{
streamVersion++;
var parameters = new DynamicParameters();
parameters.Add("stream_id", streamId);
parameters.Add("stream_version", streamVersion);
await connection.ExecuteAsync(insertIntoStatement, parameters);
}
}
public async Task AppendEventsAsync(string streamId, int eventCount)
{
var transactionOptions = new TransactionOptions
{
IsolationLevel = IsolationLevel.ReadCommitted
};
using var transactionScope = new TransactionScope(
TransactionScopeOption.Required,
transactionOptions,
TransactionScopeAsyncFlowOption.Enabled);
await using var connection = CreateConnection();
var streamVersion = await SelectLastStreamVersionAsync(connection, streamId);
await InsertEventsAsync(connection, streamId, streamVersion, eventCount);
transactionScope.Complete();
}
public async IAsyncEnumerable<MyEvent> FetchStreamEventsAsync(string streamId)
{
await using var connection = CreateConnection();
const string statement =
@"SELECT ctid, * FROM public.my_events
WHERE stream_id = @stream_id";
var parameters = new DynamicParameters();
parameters.Add("stream_id", streamId);
await using var reader = await connection.ExecuteReaderAsync(statement, parameters);
while (await reader.ReadAsync())
{
yield return new MyEvent
{
Id = reader.GetFieldValue<long>("id"),
StreamId = reader.GetFieldValue<string>("stream_id"),
StreamVersion = reader.GetFieldValue<int>("stream_version"),
};
}
}
}
public static class Program
{
public static async Task Main()
{
var connectionStringBuilder = new NpgsqlConnectionStringBuilder
{
Host = "localhost",
Port = 5432,
Username = "root",
Password = "root",
Database = "postgres",
Pooling = true
};
var connectionString = connectionStringBuilder.ToString();
var eventStore = new MyEventStore(connectionString);
await eventStore.DropTableAsync();
await eventStore.CreateTableAsync();
const string streamId = "MyEvent";
var tasks =
Enumerable
.Range(1, 25)
.Select(x =>
{
var policy = Policy
.Handle<PostgresException>(e => e.SqlState == "23505")
.RetryForeverAsync();
return policy.ExecuteAsync(async () => await eventStore.AppendEventsAsync(streamId, 2));
});
await Task.WhenAll(tasks);
var streamEvents = await eventStore.FetchStreamEventsAsync(streamId).ToListAsync();
Console.WriteLine(string.Join(Environment.NewLine, streamEvents));
}
}
Id: 1, StreamId: MyEvent, StreamVersion: 1
Id: 2, StreamId: MyEvent, StreamVersion: 2
Id: 3, StreamId: MyEvent, StreamVersion: 3
Id: 11, StreamId: MyEvent, StreamVersion: 4
Id: 15, StreamId: MyEvent, StreamVersion: 5
Id: 22, StreamId: MyEvent, StreamVersion: 6
Id: 37, StreamId: MyEvent, StreamVersion: 7
Id: 48, StreamId: MyEvent, StreamVersion: 8
Id: 58, StreamId: MyEvent, StreamVersion: 9
Id: 68, StreamId: MyEvent, StreamVersion: 10
Id: 77, StreamId: MyEvent, StreamVersion: 11
Id: 78, StreamId: MyEvent, StreamVersion: 12
Id: 95, StreamId: MyEvent, StreamVersion: 13
Id: 96, StreamId: MyEvent, StreamVersion: 14
Id: 97, StreamId: MyEvent, StreamVersion: 15
Id: 106, StreamId: MyEvent, StreamVersion: 16
Id: 116, StreamId: MyEvent, StreamVersion: 17
Id: 117, StreamId: MyEvent, StreamVersion: 18
Id: 125, StreamId: MyEvent, StreamVersion: 19
Id: 129, StreamId: MyEvent, StreamVersion: 20
Id: 133, StreamId: MyEvent, StreamVersion: 21
Id: 143, StreamId: MyEvent, StreamVersion: 22
Id: 145, StreamId: MyEvent, StreamVersion: 23
Id: 147, StreamId: MyEvent, StreamVersion: 24
Id: 152, StreamId: MyEvent, StreamVersion: 25
Id: 153, StreamId: MyEvent, StreamVersion: 26
Id: 157, StreamId: MyEvent, StreamVersion: 27
Id: 161, StreamId: MyEvent, StreamVersion: 28
Id: 177, StreamId: MyEvent, StreamVersion: 35
Id: 179, StreamId: MyEvent, StreamVersion: 36
Id: 181, StreamId: MyEvent, StreamVersion: 37
Id: 184, StreamId: MyEvent, StreamVersion: 38
Id: 190, StreamId: MyEvent, StreamVersion: 39
Id: 192, StreamId: MyEvent, StreamVersion: 40
Id: 202, StreamId: MyEvent, StreamVersion: 43
Id: 204, StreamId: MyEvent, StreamVersion: 44
Id: 162, StreamId: MyEvent, StreamVersion: 29
Id: 164, StreamId: MyEvent, StreamVersion: 30
Id: 166, StreamId: MyEvent, StreamVersion: 31
Id: 167, StreamId: MyEvent, StreamVersion: 32
Id: 170, StreamId: MyEvent, StreamVersion: 33
Id: 175, StreamId: MyEvent, StreamVersion: 34
Id: 196, StreamId: MyEvent, StreamVersion: 41
Id: 198, StreamId: MyEvent, StreamVersion: 42
Id: 211, StreamId: MyEvent, StreamVersion: 47
Id: 213, StreamId: MyEvent, StreamVersion: 48
Id: 214, StreamId: MyEvent, StreamVersion: 49
Id: 215, StreamId: MyEvent, StreamVersion: 50
Id: 207, StreamId: MyEvent, StreamVersion: 45
Id: 209, StreamId: MyEvent, StreamVersion: 46
Как показано выше в одном потоке, у меня есть некоторые проблемы с параллельными транзакциями, я хотел бы знать, как я могу предотвратить это, чтобы версии были вставлены в правильном порядке для данного потока .
Например, в приведенном выше примере у нас есть не правильно упорядоченные версии:
Id: 202, StreamId: MyEvent, StreamVersion: 43
Id: 204, StreamId: MyEvent, StreamVersion: 44
Id: 162, StreamId: MyEvent, StreamVersion: 29
Id: 164, StreamId: MyEvent, StreamVersion: 30
Id: 215, StreamId: MyEvent, StreamVersion: 50
Id: 207, StreamId: MyEvent, StreamVersion: 45
Id: 209, StreamId: MyEvent, StreamVersion: 46