Я собираю подтверждение концепции, используя Mass Transit с RabbitMq и Automatonymous в приложении asp.net core 2.1.Я использую ядро EntityFramework с Postgres для сохранения.
Я пытаюсь запустить сагу, когда делается запрос к http rest api, и вернуть результат, как только сага завершится.Что я делаю:
- подключите событие, чтобы запустить мою сагу, используя интерфейс с клиентом запроса / ответа
- , в саге опубликуйте сообщение, которое потребляетсяпотребитель
- в потребитель публикует сообщение, соответствующее другому событию в моей саге
- возвращает ответ из моей саги по завершении и завершает
Это мойкод:
мои интерфейсы
public interface IStartSagaRequest
{
Guid CorrelationId { get; set; }
string Name {get; set;}
}
public interface IStartSagaResponse
{
Guid CorrelationId { get; set; }
bool DidComplete {get; set;}
}
public IDoOperationRequest
{
Guid CorrelationId { get; set; }
}
public IOperationComplete
{
Guid CorrelationId { get; set; }
bool OperationSuccessful {get; set;}
}
Мой экземпляр саги
public class DoOperationSaga : SagaStateMachineInstance
{
public Guid CorrelationId { get; set; }
public Name { get; set; }
public string CurrentState { get; set; }
}
конкретная реализация IDoOperationRequest, используемая для публикации в конечном автомате
public class DoOperationRequestImpl : IDoOperationRequest
{
public Guid CorrelationId { get; set; }
}
конкретная реализация IStartSagaResponse, используемая для публикации в конечном автомате
public class StartSagaResponse : IStartSagaResponse
{
public Guid CorrelationId { get; set; }
public bool DidComplete {get; set;}
}
Мой конечный автомат
public class ProcessOperationStateMachine : MassTransitStateMachine<DoOperationSaga>
{
public State OperationPending { get; private set; }
public State Complete { get; private set; }
public Event<IOperationComplete> OperationCompleteEvent { get; private set; }
public Event<IStartSagaRequest> StartSagaRequestEvent { get; private set; }
public ProcessOperationStateMachine()
{
InstanceState(doOperationSagaInstance => doOperationSagaInstance.CurrentState);
Event(() => StartSagaRequestEvent, eventConfigurator =>
{
eventConfigurator.CorrelateById(doOperationSaga => doOperationSaga.CorrelationId,
context => context.Message.CorrelationId).SelectId(c => Guid.NewGuid());
});
Event(() => OperationCompleteEvent, eventConfigurator =>
{
eventConfigurator.CorrelateById(doOperationSaga => doOperationSaga.CorrelationId,
context => context.Message.CorrelationId);
});
Initially(
When(StartSagaRequestEvent)
.Then(context =>
{
context.Instance.CorrelationId = context.Data.CorrelationId;
context.Instance.Name = context.Data.Name;
context.Publish(new DoOperationRequestImpl
{
CorrelationId = context.Data.CorrelationId
});
})
.TransitionTo(OperationPending)
);
During(OperationPending,
When(OperationCompleteEvent)
.Then(context =>
{
// I'm just doing this for debugging
context.Instance.Name = "changed in operationComplete";
})
.ThenAsync(context => context.RespondAsync(new StartSagaResponse
{
CorrelationId = context.Data.CorrelationId,
DidComplete = true
}))
.Finalize());
}
Мой потребитель:
public class DoOperationRequestConsumer : IConsumer<ISBDoOperationRequest>
{
public async Task Consume(ConsumeContext<ISBDoOperationRequest> context)
{
await context.Publish<IOperationComplete>(new
{
CorrelationId = context.Message.CorrelationId,
OperationSuccessful = true
});
}
}
Как я подключаю вещи вDI в Startup.cs
public void ConfigureServices(IServiceCollection services)
{
stateMachine = new ProcessOperationStateMachine();
SagaDbContextFactory factory = new SagaDbContextFactory();
EntityFrameworkSagaRepository<DoOperationSaga> repository = new EntityFrameworkSagaRepository<DoOperationSaga>(factory);
services.AddMassTransit(x =>
{
x.AddBus(provider => Bus.Factory.CreateUsingRabbitMq(sbc =>
{
IRabbitMqHost host = sbc.Host(new Uri("rabbitmq://localhost/"), h =>
{
h.Username("guest");
h.Password("guest");
});
sbc.ReceiveEndpoint(host, "do-operation", ep =>
{
ep.UseMessageRetry(c => c.Interval(2, 100));
ep.StateMachineSaga(stateMachine, repository);
ep.Durable = false;
});
sbc.ReceiveEndpoint(host, "consumer-queue", ep =>
{
ep.Consumer(() => new DoOperationRequestConsumer());
ep.Durable = false;
});
}));
x.AddConsumer<DoOperationRequestConsumer>();
});
services.AddScoped<DoOperationRequestConsumer>();
services.AddScoped(p =>
p.GetRequiredService<IBusControl>()
.CreateRequestClient<IDoOperationRequest, IDoOperationResponse>(
new Uri("rabbitmq://localhost/do-operation?durable=false"),
TimeSpan.FromSeconds(30)));
}
и выполнение запроса в моем контроллере:
public IRequestClient<IDoOperationRequest, IDoOperationResponse> _doOperationClient { get; set; }
...
var guid = Guid.NewGuid();
_doOperationClient.Request(new
{
Name = "from the controller",
CorrelationId = guid
});
Я вижу, что мой конечный автомат запускается.Когда (StartSagaRequestEvent) получает удар и публикуется сообщение DoOperationRequest.DoOperationRequestConsumer получает сообщение и публикует сообщение IOperationComplete.Однако на этом все и заканчивается.Мой IOperationCompleteEvent в моей машине состояний не вызывается.Когда я просматриваю базу данных, я вижу, что мой экземпляр саги создается с помощью guid, а для CurrentState установлено значение OperationPending.Когда я смотрю на свою панель управления rabbitmq, я вижу сообщение, опубликованное после того, как мой DoOperationRequestConsumer выполнил публикацию сообщения IOperationComplete.Я просто не вижу, чтобы конечный автомат потреблял сообщение IOperationComplete, опубликованное потребителем.Когда я устанавливаю точку останова и проверяю сообщение в Consumer, я вижу, что CorrelationId установлен на то же значение CorrelationId саги.
Я также пытался явно использовать очередь "do-operation" в получателе:
public async Task Consume(ConsumeContext<ISBDoOperationRequest> context)
{
ISendEndpoint sendEndpoint = await context.GetSendEndpoint(new Uri("rabbitmq://localhost/do-operation?durable=false"));
await sendEndpoint.Send<IOperationComplete>(new
{
CorrelationId = context.Message.CorrelationId,
OperationSuccessful = true
});
}
, но все еще не смог установить соединение.
Я весь день бился об это и не уверен, что мне здесь не хватает.Если бы кто-нибудь мог дать мне несколько советов о том, что я могу делать неправильно, я был бы очень признателен, опять же извините за стену текста, я знаю, что это достаточно для чтения, но я хотел уточнить, что я делал.Большое спасибо!