Настройка EventStore NServiceBus - PullRequest
5 голосов
/ 06 апреля 2011

Как вы интегрируетесь в NServiceBus при использовании EventStore?

Я новичок в NSB и ES и пытаюсь определить наилучшие настройки для NSB при использовании ES и CQRS.

Я подключаюсь к NSB так же, как DispatchCommit в примере, https://github.com/joliver/EventStore/blob/master/doc/EventStore.Example/MainProgram.cs

  1. Публикуете ли вы весь Commit или Commit.Events?
  2. Вы создаете оболочку для ваших сообщений, потому что NSB требует IMessage для ваших сообщений? Как вы публикуете в правильной очереди тогда? Потому что обертка является универсальной в отличие от OrderSubmittedEvent, например. Если возможно, я не хочу зависимости от NSB для моих событий, потому что тогда у меня также есть это в моем домене.

Некоторый код или руководство действительно приветствуются.

Ответы [ 2 ]

4 голосов
/ 06 апреля 2011

Вот что я использую в производстве:

public sealed class NServiceBusPublisher : IPublishMessages
{
    private const string AggregateIdKey = "AggregateId";
    private const string CommitVersionKey = "CommitVersion";
    private const string EventVersionKey = "EventVersion";
    private const string BusPrefixKey = "Bus.";
    private readonly IBus bus;

    public NServiceBusPublisher(IBus bus)
    {
        this.bus = bus;
    }

    public void Dispose()
    {
        GC.SuppressFinalize(this);
    }

    public void Publish(Commit commit)
    {
        for (var i = 0; i < commit.Events.Count; i++)
        {
            var eventMessage = commit.Events[i];
            var busMessage = eventMessage.Body as IMessage;
            AppendHeaders(busMessage, commit.Headers); // optional
            AppendHeaders(busMessage, eventMessage.Headers); // optional
            AppendVersion(commit, i); // optional
            this.bus.Publish(busMessage);
        }
    }
    private static void AppendHeaders(IMessage message, IEnumerable<KeyValuePair<string, object>> headers)
    {
        headers = headers.Where(x => x.Key.StartsWith(BusPrefixKey));
        foreach (var header in headers)
        {
            var key = header.Key.Substring(BusPrefixKey.Length);
            var value = (header.Value ?? string.Empty).ToString();
            message.SetHeader(key, value);
        }
    }
    private static void AppendVersion(Commit commit, int index)
    {
        var busMessage = commit.Events[index].Body as IMessage;
        busMessage.SetHeader(AggregateIdKey, commit.StreamId.ToString());
        busMessage.SetHeader(CommitVersionKey, commit.StreamRevision.ToString());
        busMessage.SetHeader(EventVersionKey, GetSpecificEventVersion(commit, index).ToString());
    }
    private static int GetSpecificEventVersion(Commit commit, int index)
    {
        // e.g. (StreamRevision: 120) - (5 events) + 1 + (index @ 4: the last index) = event version: 120
        return commit.StreamRevision - commit.Events.Count + 1 + index;
    }
}
2 голосов
/ 11 октября 2012

В третьей версии NSB была введена функция с именем Ненавязчивый режим .С его помощью вы можете избавиться от зависимости NSB в вашем домене.

Эта новая функция в NServiceBus V3 позволяет вам передавать собственные соглашения, чтобы определить, какие типы являются определениями сообщений, вместо использования IMessage, ICommandили интерфейсы IEvent.

...