IndexOutOfRangeException при публикации в реактивном потоке - PullRequest
0 голосов
/ 10 декабря 2018

Я разрабатываю приложение, которое использует подключаемую наблюдаемую для публикации результатов.Эти результаты потребляются двумя наблюдателями в двух разных потоках;один наблюдаемый прогон в потоке пользовательского интерфейса отображает результаты в таблице и на диаграмме, а другой наблюдаемый прогон в пуле потоков по умолчанию отправляет результаты в конечную точку REST.

Однако в некоторых ситуациях послепеременное время IndexOutOfRangeException выбрасывается.Я подозреваю, что это связано с проблемой параллелизма с моим потоком, который отправляет результаты в веб-службу REST.Если я не запускаю эту ветку, результаты отображаются просто отлично.Кроме того, нерегулярность вхождения является типичной для проблемы параллелизма.Я прикрепил трассировку стека исключения.

at System.Collections.Generic.List`1.Add(T item)
at System.Reactive.Linq.ObservableImpl.Buffer`1.CountExact.ExactSink.OnNext(TSource value)
at System.Reactive.Subjects.Subject`1.OnNext(T value)
at System.Reactive.Sink`1.ForwardOnNext(TTarget value)
at System.Reactive.IdentitySink`1.OnNext(T value)
at System.Reactive.AutoDetachObserver`1.OnNextCore(T value)
at System.Reactive.ObserverBase`1.OnNext(T value)
at Wetr.Simulator.viewModel.SimulationViewModel.OnSimulationResultPublished(SimulationResult measurement) in E:\programming\csharp\Weatr\Wetr.Simulator\Wetr.Simulator\viewModel\SimulationViewModel.cs:Line 96.

Код для публикации результатов показан ниже.

private void OnSimulationResultPublished(SimulationResult measurement)
{
    this.aggregatedSimulationResultObserver?.OnNext(measurement);
}

Средство записи измерений использует следующий код для подписки на события.

this.measurementStreamSubscription = measurementStream
    .Buffer(BUFFER_SIZE)
    .Subscribe(
        results => this.writer.AddMeasurementsAsync(
            results
                .Where(container => container != null)
                .Select(container => container.Result)
        )
    );

Обновления пользовательского интерфейса обрабатываются по подписке, показанной ниже.

this.uiUpdateSubscription =
    this.AggregatedSimulationResults
        //.Select(result => new SimulationResult(
        //    result.Station, result.MeasurementSequenceID,
        //    new Measurement(
        //        result.Result.Station,
        //        result.Result.MeasurementType,
        //        new DateTime(result.Result.Timestamp.Ticks),
        //        result.Result.MeasuredValue
        //    )
        //))
        .ObserveOn(DispatcherScheduler.Current)
        .Subscribe(this.DisplayNewSimulationResults);

Я прочитал Введение в RX , но я все еще довольно новичок в Reactive Extensions.Я также попытался создать новые копии измерений перед их буферизацией, но это не устранило проблему.Поэтому я ценю любую помощь.

...