Функция Window .Net Reactive Extension Window некорректно работает с HistoricalScheduler - PullRequest
0 голосов
/ 19 февраля 2019

У меня есть временной интервал в 1 мин, и я хотел бы объединить его с интервалами в 5 мин.Класс простого ввода для тестирования:

internal class Item
{
        public int Id { get; set; }
        public int Value { get; set; }
        public DateTime Dt { get; set; }

}

Я создал код на основе Reactive для этой задачи:

IEnumerable<Item> data = new List<Item> {
            new Item { Id = 1, Value = 1, Dt = DateTime.Parse("2018-12-03 08:00:00")},
            new Item { Id = 2, Value = 1, Dt = DateTime.Parse("2018-12-03 08:01:00")},
            new Item { Id = 3, Value = 1, Dt = DateTime.Parse("2018-12-03 08:02:00")},
            new Item { Id = 4, Value = 1, Dt = DateTime.Parse("2018-12-03 08:03:00")},
            new Item { Id = 5, Value = 1, Dt = DateTime.Parse("2018-12-03 08:04:00")},
            new Item { Id = 6, Value = 1, Dt = DateTime.Parse("2018-12-03 08:05:00")},
            new Item { Id = 6, Value = 1, Dt = DateTime.Parse("2018-12-03 08:06:00")},

            new Item { Id = 7, Value = 1, Dt = DateTime.Parse("2018-12-03 08:15:00")},
            new Item { Id = 8, Value = 1, Dt = DateTime.Parse("2018-12-03 08:16:00")},
            new Item { Id = 9, Value = 1, Dt = DateTime.Parse("2018-12-03 08:17:00")},
            new Item { Id = 10, Value = 1, Dt = DateTime.Parse("2018-12-03 08:18:00")},
            new Item { Id = 11, Value = 1, Dt = DateTime.Parse("2018-12-03 08:19:00")},
            new Item { Id = 12, Value = 1, Dt = DateTime.Parse("2018-12-03 08:20:00")},
        };

        var scheduler = new HistoricalScheduler(DateTime.Parse("2018-12-03 08:00:00"));

        var replay = Observable.Generate(
            data.GetEnumerator(),
            events => events.MoveNext(),
            events => events,
            events => events.Current,
            events => events.Current.Dt,
            scheduler);

        replay
            .Window(TimeSpan.FromMinutes(5), scheduler)
            .Subscribe(stream =>
            {
                var start = scheduler.Now;
                stream
                .DefaultIfEmpty()
                .Aggregate((last, newItem) =>
                {
                    last.Value += newItem.Value;
                    last.Dt = start.DateTime;

                    return last;
                })
                .Where(e => e != null)
                .Subscribe(s => Console.WriteLine($"v:{s.Value} t:{s.Dt}"));
            });

        scheduler.Start();

        Console.ReadKey();

Но вывод неправильный:

v:5 t:12/3/2018 8:00:00 AM
v:2 t:12/3/2018 8:05:00 AM
v:1 t:12/3/2018 8:15:00 AM
v:4 t:12/3/2018 8:15:00 AM
v:1 t:12/3/2018 8:20:00 AM

Myвопрос: почему интервал 8:15:00 дублируется?Это ошибка или особенность?:)

Кроме того, я обнаружил, что это дублирование всегда появляется после разрыва временного ряда.Похоже, первый элемент после пропуска обрабатывается как отдельный интервал окна.

Ожидаемый результат:

v:5 t:2018/12/03 08:00:00
v:2 t:2018/12/03 08:05:00
v:5 t:2018/12/03 08:15:00
v:1 t:2018/12/03 08:20:00

5 мин. Агрегированные интервалы с датой начала интервала.Почему интервал после разрыва делится на 2?

1 Ответ

0 голосов
/ 19 февраля 2019

Похоже, это побочный эффект от использования объявленной локально переменной.

Правило никогда не состоит в том, чтобы смешивать состояние в наблюдаемую.

Вот рабочий код:

IEnumerable<Item> data = new List<Item>
{
        new Item { Id = 1, Value = 1, Dt = DateTime.Parse("2018-12-03 08:00:00")},
        new Item { Id = 2, Value = 1, Dt = DateTime.Parse("2018-12-03 08:01:00")},
        new Item { Id = 3, Value = 1, Dt = DateTime.Parse("2018-12-03 08:02:00")},
        new Item { Id = 4, Value = 1, Dt = DateTime.Parse("2018-12-03 08:03:00")},
        new Item { Id = 5, Value = 1, Dt = DateTime.Parse("2018-12-03 08:04:00")},
        new Item { Id = 6, Value = 1, Dt = DateTime.Parse("2018-12-03 08:05:00")},
        new Item { Id = 6, Value = 1, Dt = DateTime.Parse("2018-12-03 08:06:00")},
        new Item { Id = 7, Value = 1, Dt = DateTime.Parse("2018-12-03 08:15:00")},
        new Item { Id = 8, Value = 1, Dt = DateTime.Parse("2018-12-03 08:16:00")},
        new Item { Id = 9, Value = 1, Dt = DateTime.Parse("2018-12-03 08:17:00")},
        new Item { Id = 10, Value = 1, Dt = DateTime.Parse("2018-12-03 08:18:00")},
        new Item { Id = 11, Value = 1, Dt = DateTime.Parse("2018-12-03 08:19:00")},
        new Item { Id = 12, Value = 1, Dt = DateTime.Parse("2018-12-03 08:20:00")},
    };

var scheduler = new HistoricalScheduler(DateTime.Parse("2018-12-03 08:00:00"));

var source =
    Observable
        .Generate(
            data.GetEnumerator(),
            events => events.MoveNext(),
            events => events,
            events => events.Current,
            events => events.Current.Dt,
            scheduler);

source
    .Window(TimeSpan.FromMinutes(5), scheduler)
    .SelectMany(stream =>
        stream
            .Aggregate(
                new Item() { Value = 0, Dt = scheduler.Now.DateTime },
                (a, x) => new Item()
                {
                    Value = a.Value + x.Value,
                    Dt = x.Dt,
                }))
    .Subscribe(s => Console.WriteLine($"v:{s.Value} t:{s.Dt}"));

scheduler.Start();

Что выводит:

v:5 t:2018/12/03 08:04:00
v:2 t:2018/12/03 08:06:00
v:1 t:2018/12/03 08:15:00
v:4 t:2018/12/03 08:19:00
v:1 t:2018/12/03 08:20:00
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...