RabbitMq - ConversationId vs CorrelationId - Что больше подходит для отслеживания конкретного запроса? - PullRequest
2 голосов
/ 21 марта 2019

RabbitMQ, похоже, имеет два очень похожих свойства, и я не совсем понимаю разницу. ConversationId и CorrelationId.

Мой вариант использования следующий. У меня есть сайт, который генерирует Guid. Веб-сайт вызывает API, добавляя этот уникальный идентификатор в заголовки HttpRequest. Это в свою очередь публикует сообщение для RabbitMQ. Это сообщение обрабатывается первым потребителем и передается в другое место другому потребителю и т. Д.

В целях ведения журнала я хочу зарегистрировать идентификатор, который связывает первоначальный запрос со всеми последующими действиями. Это должно быть уникальным для этого путешествия по различным частям приложения. Следовательно. При входе в систему, например Serilog / ElasticSearch, становится легко увидеть, какой запрос вызвал первоначальный запрос, и все записи журнала для этого запроса во всем приложении можно сопоставить вместе.

Я создал провайдера, который смотрит на входящий HttpRequest идентификатор. Я назвал это «CorrelationId», но я начинаю задаваться вопросом, должно ли это действительно называться «ConversationId». С точки зрения RabbitMQ, идея «ConversationId» лучше подходит для этой модели или «CorrelationId» лучше?

В чем разница между этими двумя понятиями?

С точки зрения кода, я хотел сделать следующее. Сначала зарегистрируйте шину в моем API и настройте SendPublish для использования CorrelationId от провайдера.

// bus registration in the API
var busSettings = context.Resolve<BusSettings>();
// using AspNetCoreCorrelationIdProvider
var correlationIdProvider = context.Resolve<ICorrelationIdProvider>();

var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
    cfg.Host(
        new Uri(busSettings.HostAddress),
        h =>
        {
            h.Username(busSettings.Username);
            h.Password(busSettings.Password);
        });
    cfg.ConfigurePublish(x => x.UseSendExecute(sendContext =>
    {
        // which one is more appropriate
        //sendContext.ConversationId = correlationIdProvider.GetCorrelationId();
        sendContext.CorrelationId = correlationIdProvider.GetCorrelationId();
    }));
});

Для справки, это мой простой интерфейс провайдера

// define the interface
public interface ICorrelationIdProvider
{
    Guid GetCorrelationId();
}

И реализация AspNetCore, которая извлекает уникальный идентификатор, установленный вызывающим клиентом (то есть веб-сайтом).

public class AspNetCoreCorrelationIdProvider : ICorrelationIdProvider
{
    private IHttpContextAccessor _httpContextAccessor;

    public AspNetCoreCorrelationIdProvider(IHttpContextAccessor httpContextAccessor)
    {
        _httpContextAccessor = httpContextAccessor;
    }

    public Guid GetCorrelationId()
    {
        if (_httpContextAccessor.HttpContext.Request.Headers.TryGetValue("correlation-Id", out StringValues headers))
        {
            var header = headers.FirstOrDefault();
            if (Guid.TryParse(header, out Guid headerCorrelationId))
            {
                return headerCorrelationId;
            }
        }

        return Guid.NewGuid();
    }
}

Наконец, мои узлы службы - это простые приложения службы Windows, которые сидят и принимают опубликованные сообщения. Они используют следующее для захвата CorrelationId и могут публиковать его и для других потребителей, а также на других хостах служб.

public class MessageContextCorrelationIdProvider : ICorrelationIdProvider
{
    /// <summary>
    /// The consume context
    /// </summary>
    private readonly ConsumeContext _consumeContext;

    /// <summary>
    /// Initializes a new instance of the <see cref="MessageContextCorrelationIdProvider"/> class.
    /// </summary>
    /// <param name="consumeContext">The consume context.</param>
    public MessageContextCorrelationIdProvider(ConsumeContext consumeContext)
    {
        _consumeContext = consumeContext;
    }

    /// <summary>
    /// Gets the correlation identifier.
    /// </summary>
    /// <returns></returns>
    public Guid GetCorrelationId()
    {
        // correlationid or conversationIs?
        if (_consumeContext.CorrelationId.HasValue && _consumeContext.CorrelationId != Guid.Empty)
        {
            return _consumeContext.CorrelationId.Value;
        }

        return Guid.NewGuid();
    }
}

У меня в клиенте есть логгер, который использует этот провайдер для извлечения CorrelationId:

public async Task Consume(ConsumeContext<IMyEvent> context)
{
    var correlationId = _correlationProvider.GetCorrelationId();
    _logger.Info(correlationId, $"#### IMyEvent received for customer:{context.Message.CustomerId}");

    try
    {
        await _mediator.Send(new SomeOtherRequest(correlationId) { SomeObject: context.Message.SomeObject });
    }
    catch (Exception e)
    {
        _logger.Exception(e, correlationId, $"Exception:{e}");
        throw;
    }

    _logger.Info(correlationId, $"Finished processing: {DateTime.Now}");
}

Читая документы , он говорит следующее о "ConversationId":

Разговор создается первым отправленным сообщением или опубликовано, в котором нет существующего контекста (например, когда сообщение отправлено или опубликовано с использованием IBus.Send или IBus.Publish). Если существующий контекст используется для отправки или публикации сообщения, ConversationId копируется в новое сообщение, гарантируя, что набор сообщения в одном и том же диалоге имеют одинаковый идентификатор.

Теперь я начинаю думать, что моя терминология перепутана, и технически это разговор (хотя «разговор» похож на «телефонную игру»).

Итак, CorrelationId в этом случае использования или ConversationId? Пожалуйста, помогите мне правильно понять мою терминологию !!

1 Ответ

3 голосов
/ 22 марта 2019

В разговоре сообщений (cue foreboding музыкальная партитура) может быть одно сообщение (я сказал вам что-то сделать, или я сказал всем, кто слушает, что что-то случилось) или несколько сообщений (я сказал вам сделать что-то, и вы сказали кому-то другому, или я сказал всем, кто слушает, что-то случилось, и эти слушатели рассказали своим друзьям, и так далее, и так далее).

При использовании MassTransit, от первого сообщения до последнего сообщения, при правильном использовании, каждое из этих сообщений будет иметь одинаковый ConversationId. MassTransit копирует свойство из ConsumeContext без изменений в каждое исходящее сообщение во время использования сообщения. Это делает все частью одного и того же следа - разговора.

Однако CorrelationId не устанавливается по умолчанию в MassTransit. Он может быть установлен автоматически, если свойство сообщения называется CorrelationId (или CommandId, или EventId), или вы также можете добавить свои собственные имена.

Если CorrelationId присутствует в потребленном сообщении, любые исходящие сообщения будут иметь свойство CorrelationId, скопированное в свойство InitiatorId (причина и следствие - использованное сообщение инициировало создание последующих сообщений). Это формирует цепочку (или span, в терминологии трассировки), за которой можно следовать, чтобы показать распространение сообщений из исходного сообщения.

Идентификатор CorrelationId следует рассматривать как идентификатор команды или события, чтобы эффекты этой команды можно было видеть во всех системных журналах.

Мне кажется, что ваш ввод из HTTP может быть Инициатором, и, таким образом, скопируйте этот идентификатор в InitiatorId и создайте новый CorrelationId для сообщения, или вы можете просто использовать тот же идентификатор для исходного CorrelationId и позволить последующие сообщения используют его в качестве инициатора.

...