Создать асинхронную последовательность с фиксированным размером или по истечении времени - PullRequest
0 голосов
/ 06 мая 2018

Следующий метод вызывается несколько раз в секунду, и я должен уведомить подписчика, когда наступило несколько событий или по истечении времени:

public async Task SendEventsAsync(IEnumerable<Event> events)
{
    foreach(var event in events)
    {
        // Notify my subscriber
    }
}

Метод принадлежит одиночному и будет вызываться на протяжении всего жизненного цикла приложения.

Я не хочу уведомлять по одному. Я хотел бы подождать большого количества этих данных, прежде чем уведомить моего наблюдателя (например, 10000 единиц), и, если с момента последнего уведомления прошло определенное время, тоже сообщите (например, каждую минуту). На самом деле мне не нужно отправлять свои объекты событий подписчику, просто сообщите, что получен новый набор событий.

Я пробовал Reactive Extensions, и я думаю, что это может помочь мне:

var source = Observable.Interval(TimeSpan.FromMilliseconds(1));

var idealBatchSize = 10000;
var maxTimeDelay = TimeSpan.FromSeconds(60);
source
   .Buffer(maxTimeDelay, idealBatchSize)
   .Subscribe(buffer => 
        Console.WriteLine("Buffer of {1} @ {0}", DateTime.Now, buffer.Count));

Этот код создает бесконечную последовательность с постоянным интервалом времени, а затем проецируется в буфер с фиксированным размером и максимальным временем ожидания, что-то похожее на то, что я ищу. Но моя последовательность должна передаваться из моего метода вместо фиксированного интервала времени.

Я не знаю, как мои части должны совмещаться, какую последовательность наблюдений мне нужно создать. Или, может быть, я неправильно фокусируюсь на проблеме.

Может быть достаточно последовательности с интервалом и статическим счетчиком?

1 Ответ

0 голосов
/ 07 мая 2018

Я думаю, у вас правильная идея. Я предполагаю, что у вас есть эта структура:

public class EventNotification { }

public class SingletonThing
{
    //don't need async for Rx
    public void SendEventsAsync(IEnumerable<EventNotification> events)
    {
        foreach (var element in events)
        {
        }
    }
}

Все, что вам нужно сделать, это перенаправить события в наблюдаемую область, тогда вы можете использовать Buffer для пакетирования их по своему усмотрению:

public class SingletonThing
{
    private readonly Subject<EventNotification> _subject = new Subject<EventNotification>();
    public void SendEventsAsync(IEnumerable<EventNotification> events)
    {
        foreach (var element in events)
        {
            _subject.OnNext(element);
        }
    }

    //not necessary to expose, but could be helpful
    public IObservable<EventNotification> AllEvents => _subject.AsObservable();

    private int idealBatchSize = 10000;
    private TimeSpan maxTimeDelay = TimeSpan.FromSeconds(60);
    public IObservable<IList<EventNotification>> BatchedEvents => _subject
        .Buffer(maxTimeDelay, idealBatchSize);
}

Код подписки будет вызываться следующим образом:

SingletonThing.BatchedEvents.Subscribe(buffer =>
    Console.WriteLine("Buffer of {1} @ {0}", DateTime.Now, buffer.Count)
);
...