Я разрабатываю приложение, которое использует подключаемую наблюдаемую для публикации результатов.Эти результаты потребляются двумя наблюдателями в двух разных потоках;один наблюдаемый прогон в потоке пользовательского интерфейса отображает результаты в таблице и на диаграмме, а другой наблюдаемый прогон в пуле потоков по умолчанию отправляет результаты в конечную точку REST.
Однако в некоторых ситуациях послепеременное время IndexOutOfRangeException
выбрасывается.Я подозреваю, что это связано с проблемой параллелизма с моим потоком, который отправляет результаты в веб-службу REST.Если я не запускаю эту ветку, результаты отображаются просто отлично.Кроме того, нерегулярность вхождения является типичной для проблемы параллелизма.Я прикрепил трассировку стека исключения.
at System.Collections.Generic.List`1.Add(T item)
at System.Reactive.Linq.ObservableImpl.Buffer`1.CountExact.ExactSink.OnNext(TSource value)
at System.Reactive.Subjects.Subject`1.OnNext(T value)
at System.Reactive.Sink`1.ForwardOnNext(TTarget value)
at System.Reactive.IdentitySink`1.OnNext(T value)
at System.Reactive.AutoDetachObserver`1.OnNextCore(T value)
at System.Reactive.ObserverBase`1.OnNext(T value)
at Wetr.Simulator.viewModel.SimulationViewModel.OnSimulationResultPublished(SimulationResult measurement) in E:\programming\csharp\Weatr\Wetr.Simulator\Wetr.Simulator\viewModel\SimulationViewModel.cs:Line 96.
Код для публикации результатов показан ниже.
private void OnSimulationResultPublished(SimulationResult measurement)
{
this.aggregatedSimulationResultObserver?.OnNext(measurement);
}
Средство записи измерений использует следующий код для подписки на события.
this.measurementStreamSubscription = measurementStream
.Buffer(BUFFER_SIZE)
.Subscribe(
results => this.writer.AddMeasurementsAsync(
results
.Where(container => container != null)
.Select(container => container.Result)
)
);
Обновления пользовательского интерфейса обрабатываются по подписке, показанной ниже.
this.uiUpdateSubscription =
this.AggregatedSimulationResults
//.Select(result => new SimulationResult(
// result.Station, result.MeasurementSequenceID,
// new Measurement(
// result.Result.Station,
// result.Result.MeasurementType,
// new DateTime(result.Result.Timestamp.Ticks),
// result.Result.MeasuredValue
// )
//))
.ObserveOn(DispatcherScheduler.Current)
.Subscribe(this.DisplayNewSimulationResults);
Я прочитал Введение в RX , но я все еще довольно новичок в Reactive Extensions.Я также попытался создать новые копии измерений перед их буферизацией, но это не устранило проблему.Поэтому я ценю любую помощь.