Как реализовать автомат с автоматомным языком в C # - PullRequest
0 голосов
/ 18 мая 2018

Я пытаюсь реализовать простой пример / демо для конечного автомата, использующего Automatonymous с RabbitMQ.К сожалению, я не смог найти тот, из которого можно было бы перестроить / научиться (я нашел ShoppingWeb , но на мой взгляд это совсем не просто).Также, по моему мнению, в документации недостаточно информации.

Это пример конечного автомата, о котором я подумал (извините, он довольно уродливый): enter image description here Обратите внимание, что этот пример полностью сделани это не важно, имеет ли это смысл или нет.Цель этого проекта - «прогреться» с помощью Automatonymous.

Что я хочу сделать / иметь:

  • Четыре запущенных приложения:
    1. Конечный автоматсам
    2. «Запрашивающий», отправляющий запросы для интерпретации
    3. «Проверяющий» или «анализатор», проверяющий, является ли предоставленный запрос действительным
    4. «Интерпретатор», интерпретирующий данноезапрос
  • Примером этого может быть:
    • Запрашивающий отправляет "x = 5"
    • Валидатор проверяет, содержится ли "="
    • Intepreter говорит "5"

Моя реализация конечного автомата выглядит следующим образом:

public class InterpreterStateMachine : MassTransitStateMachine<InterpreterInstance>
    {
        public InterpreterStateMachine()
        {
            InstanceState(x => x.CurrentState);
            Event(() => Requesting, x => x.CorrelateBy(request => request.Request.RequestString, context => context.Message.Request.RequestString)
                .SelectId(context => Guid.NewGuid())); 
            Event(() => Validating, x => x.CorrelateBy(request => request.Request.RequestString, context => context.Message.Request.RequestString));
            Event(() => Interpreting, x => x.CorrelateBy(request => request.Request.RequestString, context => context.Message.Request.RequestString));

            Initially(
                When(Requesting)
                    .Then(context =>
                    {
                        context.Instance.Request = new Request(context.Data.Request.RequestString);                        
                    })
                    .ThenAsync(context => Console.Out.WriteLineAsync($"Request received: {context.Data.Request.RequestString}"))
                    .Publish(context => new ValidationNeededEvent(context.Instance))
                    .TransitionTo(Requested)
                );

            During(Requested,
                When(Validating)
                    .Then(context =>
                    {
                        context.Instance.Request.IsValid = context.Data.Request.IsValid;
                        if (!context.Data.Request.IsValid)
                        {
                            this.TransitionToState(context.Instance, Error);
                        }
                        else
                        {
                            this.TransitionToState(context.Instance, RequestValid);
                        }
                    })
                    .ThenAsync(context => Console.Out.WriteLineAsync($"Request '{context.Data.Request.RequestString}' validated with {context.Instance.Request.IsValid}"))
                    .Publish(context => new InterpretationNeededEvent(context.Instance))
                    ,
                Ignore(Requesting),
                Ignore(Interpreting)
                );

            During(RequestValid,
                When(Interpreting)
                    .Then((context) =>
                    {
                        //do something
                    })
                    .ThenAsync(context => Console.Out.WriteLineAsync($"Request '{context.Data.Request.RequestString}' interpreted with {context.Data.Answer}"))
                    .Publish(context => new AnswerReadyEvent(context.Instance))
                    .TransitionTo(AnswerReady)
                    .Finalize(),
                Ignore(Requesting),
                Ignore(Validating)
                );

            SetCompletedWhenFinalized();
        }

        public State Requested { get; private set; }
        public State RequestValid { get; private set; }
        public State AnswerReady { get; private set; }
        public State Error { get; private set; }

        //Someone is sending a request to interprete
        public Event<IRequesting> Requesting { get; private set; }
        //Request is validated
        public Event<IValidating> Validating { get; private set; }
        //Request is interpreted
        public Event<IInterpreting> Interpreting { get; private set; }


        class ValidationNeededEvent : IValidationNeeded
        {
            readonly InterpreterInstance _instance;

            public ValidationNeededEvent(InterpreterInstance instance)
            {
                _instance = instance;
            }

            public Guid RequestId => _instance.CorrelationId;

            public Request Request => _instance.Request;
        }

        class InterpretationNeededEvent : IInterpretationNeeded
        {
            readonly InterpreterInstance _instance;

            public InterpretationNeededEvent(InterpreterInstance instance)
            {
                _instance = instance;
            }

            public Guid RequestId => _instance.CorrelationId;
        }

        class AnswerReadyEvent : IAnswerReady
        {
            readonly InterpreterInstance _instance;

            public AnswerReadyEvent(InterpreterInstance instance)
            {
                _instance = instance;
            }

            public Guid RequestId => _instance.CorrelationId;
        }    
    }

Тогда у меня есть такие сервисы:

public class RequestService : ServiceControl
    {
        readonly IScheduler scheduler;
        IBusControl busControl;
        BusHandle busHandle;
        InterpreterStateMachine machine;
        InMemorySagaRepository<InterpreterInstance> repository;

        public RequestService()
        {
            scheduler = CreateScheduler();
        }

        public bool Start(HostControl hostControl)
        {
            Console.WriteLine("Creating bus...");

            machine = new InterpreterStateMachine();
            repository = new InMemorySagaRepository<InterpreterInstance>();


            busControl = Bus.Factory.CreateUsingRabbitMq(x =>
            {
                IRabbitMqHost host = x.Host(new Uri(/*rabbitMQ server*/), h =>
                {
                    /*credentials*/
                });

                x.UseInMemoryScheduler();

                x.ReceiveEndpoint(host, "interpreting_answer", e =>
                {
                    e.PrefetchCount = 5; //?
                    e.StateMachineSaga(machine, repository);
                });

                x.ReceiveEndpoint(host, "2", e =>
                {
                    e.PrefetchCount = 1;
                    x.UseMessageScheduler(e.InputAddress);

                    //Scheduling !?

                    e.Consumer(() => new ScheduleMessageConsumer(scheduler));
                    e.Consumer(() => new CancelScheduledMessageConsumer(scheduler));
                });

            });

            Console.WriteLine("Starting bus...");

            try
            {
                busHandle = MassTransit.Util.TaskUtil.Await<BusHandle>(() => busControl.StartAsync());
                scheduler.JobFactory = new MassTransitJobFactory(busControl);
                scheduler.Start();
            }
            catch (Exception)
            {
                scheduler.Shutdown();
                throw;
            }

            return true;
        }

        public bool Stop(HostControl hostControl)
        {
            Console.WriteLine("Stopping bus...");

            scheduler.Standby();

            if (busHandle != null) busHandle.Stop();

            scheduler.Shutdown();

            return true;
        }

        static IScheduler CreateScheduler()
        {
            ISchedulerFactory schedulerFactory = new StdSchedulerFactory();
            IScheduler scheduler = MassTransit.Util.TaskUtil.Await<IScheduler>(() => schedulerFactory.GetScheduler()); ;

            return scheduler;
        }
    }

Мои вопросы:

  1. Как отправить «начальный» запрос, чтобы конечный автомат перешел в мое исходное состояние
  2. Какя "реагирую" на потребителей, чтобы проверить отправленные данные, а затем отправить новые данные, как в 1?

1 Ответ

0 голосов
/ 06 июня 2018

Хорошо, я понял это.У меня, вероятно, были проблемы, потому что я не только новичок в Masstransit / Automatonymous и RabbitMQ, но также не имею большого опыта работы с C #.

Так что, если у кого-то когда-нибудь возникнет такая же проблема, вот что выпотребность: учитывая приведенный выше пример, существует три различных типа плюс несколько небольших интерфейсов:

  1. Отправитель (в данном случае «запросчик»), включая определенного потребителя
  2. Служба, котораяпотребляет определенные типы сообщений («валидатор» и «интерпретатор»)
  3. Служба, которая содержит конечный автомат без определенного потребителя
  4. Некоторые «контракты», которые являются интерфейсами, определяющими тип сообщенияотправлено / использовано

1) Это отправитель:

    using InterpreterStateMachine.Contracts;
    using MassTransit;
    using System;
    using System.Threading.Tasks;

    namespace InterpreterStateMachine.Requester
    {
        class Program
        {
            private static IBusControl _busControl;

            static void Main(string[] args)
            {            
                var busControl = ConfigureBus();
                busControl.Start();

                Console.WriteLine("Enter request or quit to exit: ");
                while (true)
                {
                    Console.Write("> ");
                    String value = Console.ReadLine();

                    if ("quit".Equals(value,StringComparison.OrdinalIgnoreCase))
                        break;

                    if (value != null)
                    {
                        String[] values = value.Split(';');

                        foreach (String v in values)
                        {
                            busControl.Publish<IRequesting>(new
                            {
                                Request = new Request(v),
                                TimeStamp = DateTime.UtcNow
                            });
                        }
                    }
                }

                busControl.Stop();
            }


            static IBusControl ConfigureBus()
            {
                if (null == _busControl)
                {
                    _busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
                    {                    
                        var host = cfg.Host(new Uri(/*rabbitMQ server*/), h =>
                        {                        
                            /*credentials*/
                        });

                        cfg.ReceiveEndpoint(host, "answer_ready", e =>
                        {
                            e.Durable = true;
                            //here the consumer is registered
                            e.Consumer<AnswerConsumer>();
                        });
                    });
                    _busControl.Start();
                }
                return _busControl;
            }

            //here comes the actual logic of the consumer, which consumes a "contract"
            class AnswerConsumer : IConsumer<IAnswerReady>
            {
                public async Task Consume(ConsumeContext<IAnswerReady> context)
                {
                    await Console.Out.WriteLineAsync($"\nReceived Answer for \"{context.Message.Request.RequestString}\": {context.Message.Answer}.");
                    await Console.Out.WriteAsync(">");
                }
            }        
        }
    }

2) Это услуга (вот онаявляется валидационным)

using InterpreterStateMachine.Contracts;
using MassTransit;
using MassTransit.QuartzIntegration;
using MassTransit.RabbitMqTransport;
using Quartz;
using Quartz.Impl;
using System;
using System.Threading.Tasks;
using Topshelf;

namespace InterpreterStateMachine.Validator
{
    public class ValidationService : ServiceControl
    {
        readonly IScheduler _scheduler;
        static IBusControl _busControl;
        BusHandle _busHandle;        

        public static IBus Bus => _busControl;

        public ValidationService()
        {
            _scheduler = CreateScheduler();
        }

        public bool Start(HostControl hostControl)
        {
            Console.WriteLine("Creating bus...");

            _busControl = MassTransit.Bus.Factory.CreateUsingRabbitMq(x =>
            {
                IRabbitMqHost host = x.Host(new Uri(/*rabbitMQ server*/), h =>
                {
                    /*credentials*/
                });

                x.UseInMemoryScheduler();
                x.UseMessageScheduler(new Uri(RabbitMqServerAddress));

                x.ReceiveEndpoint(host, "validation_needed", e =>
                {
                    e.PrefetchCount = 1;
                    e.Durable = true;
                    //again this is how the consumer is registered
                    e.Consumer<RequestConsumer>();
                });                               
            });

            Console.WriteLine("Starting bus...");

            try
            {
                _busHandle = MassTransit.Util.TaskUtil.Await<BusHandle>(() => _busControl.StartAsync());
                _scheduler.JobFactory = new MassTransitJobFactory(_busControl);
                _scheduler.Start();
            }
            catch (Exception)
            {
                _scheduler.Shutdown();
                throw;
            }                
            return true;
        }

        public bool Stop(HostControl hostControl)
        {
            Console.WriteLine("Stopping bus...");
            _scheduler.Standby();
            _busHandle?.Stop();
            _scheduler.Shutdown();
            return true;
        }

        static IScheduler CreateScheduler()
        {
            ISchedulerFactory schedulerFactory = new StdSchedulerFactory();
            IScheduler scheduler = MassTransit.Util.TaskUtil.Await<IScheduler>(() => schedulerFactory.GetScheduler());

            return scheduler;
        }
    }

    //again here comes the actual consumer logic, look how the message is re-published after it was checked
    class RequestConsumer : IConsumer<IValidationNeeded>
    {
        public async Task Consume(ConsumeContext<IValidationNeeded> context)
        {
            await Console.Out.WriteLineAsync($"(c) Received {context.Message.Request.RequestString} for validation (Id: {context.Message.RequestId}).");

            context.Message.Request.IsValid = context.Message.Request.RequestString.Contains("=");

            //send the new message on the "old" context
            await context.Publish<IValidating>(new
            {
                Request = context.Message.Request,
                IsValid = context.Message.Request.IsValid,
                TimeStamp = DateTime.UtcNow,
                RequestId = context.Message.RequestId
            });
        }
    }
}

Валидатор использует контракт "IValidationNeeded", а затем публикует контракт "IValidating", который затем будет использоваться самим конечным автоматом(событие «Проверка»).

3) Разница между обслуживанием потребителя и обслуживанием машины зависит от «ReceiveEndpoint».Здесь не зарегистрирован потребитель, но конечный автомат установлен:

...
InterpreterStateMachine _machine = new InterpreterStateMachine();
InMemorySagaRepository<InterpreterInstance> _repository = new InMemorySagaRepository<InterpreterInstance>();
...
x.ReceiveEndpoint(host, "state_machine", e =>
{
    e.PrefetchCount = 1;
    //here the state machine is set
    e.StateMachineSaga(_machine, _repository);
    e.Durable = false;
});

4) И последнее, но не менее важное: контракты довольно маленькие и выглядят так:

using System;

namespace InterpreterStateMachine.Contracts
{
    public interface IValidationNeeded
    {
        Guid RequestId { get; }
        Request Request { get; }
    }
}

В общем, все довольно просто, мне просто пришлось использовать свой мозг: D

Надеюсь, это кому-нибудь поможет.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...