У меня есть подпрограмма, которая читает данные из БД и публикует эти данные через IObservable.
После публикации данных я хочу обновить все те строки, которые только что были опубликованы, чтобы остановить их повторную публикацию.
Я не уверен в «реактивном» способе сделать это. (Каждый раз, когда я снова посещаю Rx, у меня возникает эта проблема!)
Я думаю, мне нужно сделать 2 вещи
1) Кэшируйте данные в том виде, в котором они опубликованы, поскольку у них будут идентификаторы, которые мне нужно будет обновить впоследствии - я задавался вопросом, использовать ли субъект для кэширования публикуемых данных или использовать какую-то другую подпрограмму, которая оборачивает то, что я в настоящее время есть, подписаться на него, кэшировать его и затем переиздать
2) Обновите данные после их публикации. Я действительно не уверен, как вообще встроить это в конвейер!
Я построил это из различных вещей, которые я нашел в сети (особенно Ли Кэмпбелл для опроса дБ - спасибо Ли!), Но другие биты, которые я добавил, мои, и я, возможно, получил это плохо неправильно. Я открыт для предложений, если некоторые части будут лучше реализованы безреактивно. Например, я сделал процедуру обновления базы данных наблюдаемой, но я не знаю, действительно ли это необходимо - или просто проще включить ее в конвейер, если она реализована таким образом ...
Вот соответствующие биты кода ...
private IObservable<INotification> Poller() =>
Observable
.Timer(_pollingPeriod, _scheduler)
.SelectMany(_ => NewNotifications(_cx))
.Timeout(_pollingPeriod + _queryTimeout, Observable.Return(TimeOut.Notification()), _scheduler)
.Catch<INotification, Exception>(err => Observable.Return(Error.Notification(err)))
.Repeat();
private IObservable<INotification> NewNotifications(string cx)
{
try
{
return SqlRead<INotification>(cx, NewNotificationsSql(),sdr => EventBuilder(sdr), Empty.Notification());
}
catch (Exception ex)
{
throw ex;
}
}
internal static IObservable<T> SqlRead<T>(string cx, string sql, Func<SqlDataReader, T> mapper, T noRows) =>
Observable.Create<T>(o =>
{
using (var conn = new SqlConnection(cx))
{
conn.Open();
using (var cmd = new SqlCommand(sql, conn))
{
using (var rdr = cmd.ExecuteReader())
{
if (!rdr.HasRows)
{
o.OnNext(noRows);
}
else
{
while (rdr.Read())
{
o.OnNext(mapper(rdr));
}
}
}
}
}
o.OnCompleted();
return Disposable.Empty;
});
internal static IObservable<int> SqlWrite(string cx, string sql) =>
Observable.Create<int>(o =>
{
using (var conn = new SqlConnection(cx))
{
conn.Open();
using (var cmd = new SqlCommand(sql, conn))
{
o.OnNext(cmd.ExecuteNonQuery());
}
}
o.OnCompleted();
return Disposable.Empty;
});