Выполните метод после некоторого бездействия, используя Rx .Net - PullRequest
0 голосов
/ 17 июня 2019

У меня есть действие контроллера, подобное этому:

[HttpPost("Post")]
public async Task Post([FromBody] UpdateDataCommand command)
{
    await _mediator.Send(command);
}

Это выполняется в .Net Core и использует MediatR для обработки команд.

Теперь UpdateDataCommand имеет целочисленное StationId свойство, которое идентифицирует номер станции.Когда клиентское приложение вызывает этот метод, выполняя публикацию, оно обновляет данные в базе данных.Я хочу использовать Rx .Net для запуска таймера после команды Await _mediator.Send (команда).Таймер будет установлен на 1 минуту.Через 1 минуту я хочу вызвать другой метод, который установит флаг в базе данных, но только для этого StationId.Если кто-то делает публикацию, используя тот же StationId, таймер должен сбросить сам себя.

В псевдокоде выглядит так:

[HttpPost("Post")]
public async Task Post([FromBody] UpdateDataCommand command)
{
    int stationId = command.StationId;
    // let's assume stationId==2

    //saves data for stationId==2
    await _mediator.Send(command);

    //Start a timer of 1 min
    //if timer fires (meaning the 1 minute has passed) call Method2();
    //if client does another "Post" for stationId==2 in the meantime 
      (let's say that the client does another "Post" for stationId==2 after 20 sec)
      then reset the timer
}

Как это сделать с помощью Reactive Extensions in.Net?

ОБНОВЛЕНИЕ (@Enigmativity) : Это все еще не работает, я установил таймер на 10 секунд, и если вы посмотрите на время вывода, вы увидите, что я сделал сообщение на09:17:49 (который запустил таймер на 10 секунд), затем я сделал новое сообщение в 09:17:55 (который запустил другой таймер, но он должен был только сбросить старый), и оба таймера сработаличерез 10 секунд после первого вызова и через 10 секунд после второго вызова .: Application output

Ответы [ 2 ]

1 голос
/ 18 июня 2019

Я не смог проверить это, но я думаю, что это довольно близко:

private Subject<UpdateDataCommand> posted = new Subject<UpdateDataCommand>();

private void PostInitialize()
{
    posted
        .GroupBy(x => x.StationId)
        .Select(gxs =>
            gxs
                .Select(x =>
                    Observable
                        .Timer(TimeSpan.FromMinutes(1.0))
                        .Select(_ => x))
                .Switch())
        .Merge()
        .Subscribe(stationId =>
        {
            /* update database */
        });
}

public async Task Post(UpdateDataCommand command)
{
    int stationId = command.StationId;
    await _mediator.Send(command);
    posted.OnNext(command);
}

Дайте мне знать, если это близко.

Вам нужно позвонить PostInitialize, чтобы настроить его, прежде чем начать публиковать команды обновления данных.


Вот тест, который показывает, что это работает:

var rnd = new Random();

var posted =
    Observable
        .Generate(
            0, x => x < 20, x => x + 1, x => x,
            x => TimeSpan.FromSeconds(rnd.NextDouble()));

posted
    .GroupBy(x => x % 3)
    .Select(gxs =>
        gxs
            .Select(x =>
                Observable
                    .Timer(TimeSpan.FromSeconds(1.9))
                    .Select(_ => x))
            .Switch())
    .Merge()
    .Subscribe(x => Console.WriteLine(x));

Я получаю результаты как:

3
4
14
15
17
18
19

Поскольку я использовал .GroupBy(x => x % 3), это всегда будет выводить 17, 18, & 19 - но будет выводить более ранние числа, если случайный интервал достаточно велик.

0 голосов
/ 18 июня 2019

Чтобы запустить таймер с помощью Rx.Net, мы можем вызвать:

var subscription = Observable.Timer(TimeSpan.FromSeconds(timeout))
    .Subscribe(
        value =>{    /* ... */ }
    );

Чтобы отменить эту подписку, нам просто нужно удалить эту подписку позже:

subscription.Dispose();

Проблема в том, как сохранить подписку. Один из подходов заключается в создании службы SubscriptionManager (singleton), поэтому мы можем вызвать такую ​​службу для планирования задачи, а затем отменить ее позже в рамках действия контроллера, как показано ниже:

// you controller class

    private readonly ILogger<HomeController> _logger;       // injected by DI
    private readonly SubscriptionManager _subscriptionMgr;  // injected by DI


    public async Task Post(...)
    {
        ...
        // saves data for #stationId
        // Start a timer of 1 min
        this._subscriptionMgr.ScheduleForStationId(stationId);    // schedule a task that for #stationId that will be executed in 60s
    }


    [HttpPost("/Command2")]
    public async Task Command2(...)
    {
        int stationId =  command.StationId;
        if( shouldCancel ){
            this._subscriptionMgr.CancelForStationId(stationId);  // cancel previous task for #stationId
        }
    }

Если вы хотите управлять подписками в памяти, мы можем использовать ConcurrentDictionary для хранения подсписок:

public class SubscriptionManager : IDisposable
{
    private ConcurrentDictionary<string,IDisposable> _dict;
    private readonly IServiceProvider _sp;
    private readonly ILogger<SubscriptionManager> _logger;

    public SubscriptionManager(IServiceProvider sp, ILogger<SubscriptionManager> logger)
    {
        this._dict= new ConcurrentDictionary<string,IDisposable>();
        this._sp = sp;
        this._logger = logger;
    }

    public IDisposable ScheduleForStationId(int stationId)
    {
        var timeout = 60;
        this._logger.LogWarning($"Task for Station#{stationId} will be exexuted in {timeout}s") ;
        var subscription = Observable.Timer(TimeSpan.FromSeconds(timeout))
            .Subscribe(
                value =>{  
                    // if you need update the db, create a new scope:
                    using(var scope = this._sp.CreateScope()){
                        var dbContext = scope.ServiceProvider.GetRequiredService<AppDbContext>();
                        var station=dbContext.StationStatus.Where(ss => ss.StationId == stationId)
                            .FirstOrDefault();
                        station.Note = "updated";
                        dbContext.SaveChanges();
                    }
                    this._logger.LogWarning($"Task for Station#{stationId} has been executed") ;
                },
                e =>{
                    Console.WriteLine("Error!"+ e.Message);
                }
            );
        this._dict.AddOrUpdate( stationId.ToString(), subscription , (id , sub)=> {
            sub.Dispose();       // dispose the old one
            return subscription;
        });
        return subscription;
    }

    public void CancelForStationId(int stationId)
    {
        IDisposable subscription = null;
        this._dict.TryGetValue(stationId.ToString(), out subscription);
        this._logger.LogWarning($"Task for station#{stationId} has been canceled");
        subscription?.Dispose();

        // ... if you want to update the db , create a new scope
        using(var scope = this._sp.CreateScope()){
            var dbContext = scope.ServiceProvider.GetRequiredService<AppDbContext>();
            var station=dbContext.StationStatus.Where(ss => ss.StationId == stationId)
                .FirstOrDefault();
            station.Note = "canceled";
            dbContext.SaveChanges();
            this._logger.LogWarning("The db has been changed");
        }
    }

    public void Dispose()
    {
        foreach(var entry in this._dict){
            entry.Value.Dispose();
        }
    }
}

Другой подход заключается в создании плоской записи в диспетчере задач (например, cron), но он вообще не будет использовать Rx.NET.

...