Чтобы запустить таймер с помощью Rx.Net
, мы можем вызвать:
var subscription = Observable.Timer(TimeSpan.FromSeconds(timeout))
.Subscribe(
value =>{ /* ... */ }
);
Чтобы отменить эту подписку, нам просто нужно удалить эту подписку позже:
subscription.Dispose();
Проблема в том, как сохранить подписку. Один из подходов заключается в создании службы SubscriptionManager
(singleton), поэтому мы можем вызвать такую службу для планирования задачи, а затем отменить ее позже в рамках действия контроллера, как показано ниже:
// you controller class
private readonly ILogger<HomeController> _logger; // injected by DI
private readonly SubscriptionManager _subscriptionMgr; // injected by DI
public async Task Post(...)
{
...
// saves data for #stationId
// Start a timer of 1 min
this._subscriptionMgr.ScheduleForStationId(stationId); // schedule a task that for #stationId that will be executed in 60s
}
[HttpPost("/Command2")]
public async Task Command2(...)
{
int stationId = command.StationId;
if( shouldCancel ){
this._subscriptionMgr.CancelForStationId(stationId); // cancel previous task for #stationId
}
}
Если вы хотите управлять подписками в памяти, мы можем использовать ConcurrentDictionary
для хранения подсписок:
public class SubscriptionManager : IDisposable
{
private ConcurrentDictionary<string,IDisposable> _dict;
private readonly IServiceProvider _sp;
private readonly ILogger<SubscriptionManager> _logger;
public SubscriptionManager(IServiceProvider sp, ILogger<SubscriptionManager> logger)
{
this._dict= new ConcurrentDictionary<string,IDisposable>();
this._sp = sp;
this._logger = logger;
}
public IDisposable ScheduleForStationId(int stationId)
{
var timeout = 60;
this._logger.LogWarning($"Task for Station#{stationId} will be exexuted in {timeout}s") ;
var subscription = Observable.Timer(TimeSpan.FromSeconds(timeout))
.Subscribe(
value =>{
// if you need update the db, create a new scope:
using(var scope = this._sp.CreateScope()){
var dbContext = scope.ServiceProvider.GetRequiredService<AppDbContext>();
var station=dbContext.StationStatus.Where(ss => ss.StationId == stationId)
.FirstOrDefault();
station.Note = "updated";
dbContext.SaveChanges();
}
this._logger.LogWarning($"Task for Station#{stationId} has been executed") ;
},
e =>{
Console.WriteLine("Error!"+ e.Message);
}
);
this._dict.AddOrUpdate( stationId.ToString(), subscription , (id , sub)=> {
sub.Dispose(); // dispose the old one
return subscription;
});
return subscription;
}
public void CancelForStationId(int stationId)
{
IDisposable subscription = null;
this._dict.TryGetValue(stationId.ToString(), out subscription);
this._logger.LogWarning($"Task for station#{stationId} has been canceled");
subscription?.Dispose();
// ... if you want to update the db , create a new scope
using(var scope = this._sp.CreateScope()){
var dbContext = scope.ServiceProvider.GetRequiredService<AppDbContext>();
var station=dbContext.StationStatus.Where(ss => ss.StationId == stationId)
.FirstOrDefault();
station.Note = "canceled";
dbContext.SaveChanges();
this._logger.LogWarning("The db has been changed");
}
}
public void Dispose()
{
foreach(var entry in this._dict){
entry.Value.Dispose();
}
}
}
Другой подход заключается в создании плоской записи в диспетчере задач (например, cron
), но он вообще не будет использовать Rx.NET.