Хорошо, я понял это.У меня, вероятно, были проблемы, потому что я не только новичок в Masstransit / Automatonymous и RabbitMQ, но также не имею большого опыта работы с C #.
Так что, если у кого-то когда-нибудь возникнет такая же проблема, вот что выпотребность: учитывая приведенный выше пример, существует три различных типа плюс несколько небольших интерфейсов:
- Отправитель (в данном случае «запросчик»), включая определенного потребителя
- Служба, котораяпотребляет определенные типы сообщений («валидатор» и «интерпретатор»)
- Служба, которая содержит конечный автомат без определенного потребителя
- Некоторые «контракты», которые являются интерфейсами, определяющими тип сообщенияотправлено / использовано
1) Это отправитель:
using InterpreterStateMachine.Contracts;
using MassTransit;
using System;
using System.Threading.Tasks;
namespace InterpreterStateMachine.Requester
{
class Program
{
private static IBusControl _busControl;
static void Main(string[] args)
{
var busControl = ConfigureBus();
busControl.Start();
Console.WriteLine("Enter request or quit to exit: ");
while (true)
{
Console.Write("> ");
String value = Console.ReadLine();
if ("quit".Equals(value,StringComparison.OrdinalIgnoreCase))
break;
if (value != null)
{
String[] values = value.Split(';');
foreach (String v in values)
{
busControl.Publish<IRequesting>(new
{
Request = new Request(v),
TimeStamp = DateTime.UtcNow
});
}
}
}
busControl.Stop();
}
static IBusControl ConfigureBus()
{
if (null == _busControl)
{
_busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
var host = cfg.Host(new Uri(/*rabbitMQ server*/), h =>
{
/*credentials*/
});
cfg.ReceiveEndpoint(host, "answer_ready", e =>
{
e.Durable = true;
//here the consumer is registered
e.Consumer<AnswerConsumer>();
});
});
_busControl.Start();
}
return _busControl;
}
//here comes the actual logic of the consumer, which consumes a "contract"
class AnswerConsumer : IConsumer<IAnswerReady>
{
public async Task Consume(ConsumeContext<IAnswerReady> context)
{
await Console.Out.WriteLineAsync($"\nReceived Answer for \"{context.Message.Request.RequestString}\": {context.Message.Answer}.");
await Console.Out.WriteAsync(">");
}
}
}
}
2) Это услуга (вот онаявляется валидационным)
using InterpreterStateMachine.Contracts;
using MassTransit;
using MassTransit.QuartzIntegration;
using MassTransit.RabbitMqTransport;
using Quartz;
using Quartz.Impl;
using System;
using System.Threading.Tasks;
using Topshelf;
namespace InterpreterStateMachine.Validator
{
public class ValidationService : ServiceControl
{
readonly IScheduler _scheduler;
static IBusControl _busControl;
BusHandle _busHandle;
public static IBus Bus => _busControl;
public ValidationService()
{
_scheduler = CreateScheduler();
}
public bool Start(HostControl hostControl)
{
Console.WriteLine("Creating bus...");
_busControl = MassTransit.Bus.Factory.CreateUsingRabbitMq(x =>
{
IRabbitMqHost host = x.Host(new Uri(/*rabbitMQ server*/), h =>
{
/*credentials*/
});
x.UseInMemoryScheduler();
x.UseMessageScheduler(new Uri(RabbitMqServerAddress));
x.ReceiveEndpoint(host, "validation_needed", e =>
{
e.PrefetchCount = 1;
e.Durable = true;
//again this is how the consumer is registered
e.Consumer<RequestConsumer>();
});
});
Console.WriteLine("Starting bus...");
try
{
_busHandle = MassTransit.Util.TaskUtil.Await<BusHandle>(() => _busControl.StartAsync());
_scheduler.JobFactory = new MassTransitJobFactory(_busControl);
_scheduler.Start();
}
catch (Exception)
{
_scheduler.Shutdown();
throw;
}
return true;
}
public bool Stop(HostControl hostControl)
{
Console.WriteLine("Stopping bus...");
_scheduler.Standby();
_busHandle?.Stop();
_scheduler.Shutdown();
return true;
}
static IScheduler CreateScheduler()
{
ISchedulerFactory schedulerFactory = new StdSchedulerFactory();
IScheduler scheduler = MassTransit.Util.TaskUtil.Await<IScheduler>(() => schedulerFactory.GetScheduler());
return scheduler;
}
}
//again here comes the actual consumer logic, look how the message is re-published after it was checked
class RequestConsumer : IConsumer<IValidationNeeded>
{
public async Task Consume(ConsumeContext<IValidationNeeded> context)
{
await Console.Out.WriteLineAsync($"(c) Received {context.Message.Request.RequestString} for validation (Id: {context.Message.RequestId}).");
context.Message.Request.IsValid = context.Message.Request.RequestString.Contains("=");
//send the new message on the "old" context
await context.Publish<IValidating>(new
{
Request = context.Message.Request,
IsValid = context.Message.Request.IsValid,
TimeStamp = DateTime.UtcNow,
RequestId = context.Message.RequestId
});
}
}
}
Валидатор использует контракт "IValidationNeeded", а затем публикует контракт "IValidating", который затем будет использоваться самим конечным автоматом(событие «Проверка»).
3) Разница между обслуживанием потребителя и обслуживанием машины зависит от «ReceiveEndpoint».Здесь не зарегистрирован потребитель, но конечный автомат установлен:
...
InterpreterStateMachine _machine = new InterpreterStateMachine();
InMemorySagaRepository<InterpreterInstance> _repository = new InMemorySagaRepository<InterpreterInstance>();
...
x.ReceiveEndpoint(host, "state_machine", e =>
{
e.PrefetchCount = 1;
//here the state machine is set
e.StateMachineSaga(_machine, _repository);
e.Durable = false;
});
4) И последнее, но не менее важное: контракты довольно маленькие и выглядят так:
using System;
namespace InterpreterStateMachine.Contracts
{
public interface IValidationNeeded
{
Guid RequestId { get; }
Request Request { get; }
}
}
В общем, все довольно просто, мне просто пришлось использовать свой мозг: D
Надеюсь, это кому-нибудь поможет.