.net Rx: упорядоченная пакетная обработка сообщений - PullRequest
3 голосов
/ 01 февраля 2011

Я пытаюсь реализовать асинхронный рабочий процесс с использованием Rx, и, похоже, я делаю это совершенно неправильно.

То, что я хотел бы сделать, это:

From an undefined asynchronous stream of un-parsed message strings (i.e. an IObservable<string>)
parse the message strings asynchronously, but preserve their order. (IObservable<Message>)
Batch up parsed Messages in groups of 100 or so (IObservable<IEnumerable<Message>>)
Send each batch, when complete, to the UI thread to be processed. Batches must arrive in the same order they were started.

Кажется, я не могу получить сохранение порядка, а также Rx, кажется, не выполняет действия асинхронно, когдаожидал их.

Я попытался сохранить порядок, используя IEnumerable вместо IObservable, а затем вызвав для него операторы .AsParallel (). AsOrdered ().Вот кодСм. Примечания ниже для проблем, которые у меня возникают:

    private IObservable<IEnumerable<Message>> messageSource;
    public IObservable<IEnumerable<Message>> MessageSource { get { return messageSource; } }

    /// <summary>
    /// Sub-classes of MessageProviderBase provide this IEnumerable to 
    /// generate unparsed message strings synchronously
    /// </summary>
    protected abstract IEnumerable<string> UnparsedMessages { get; }

    public MessageProviderBase()
    {
        // individual parsed messages as a PLINQ query
        var parsedMessages = from unparsedMessage in UnparsedMessages.AsParallel().AsOrdered()
                                                 select ParseMessage(unparsedMessage);

        // convert the above PLINQ query to an observable, buffering up to 100 messages at a time
        var batchedMessages
            = parsedMessages.ToObservable().BufferWithTimeOrCount(TimeSpan.FromMilliseconds(200), 100);

        // ISSUE #1:
        // batchedMessages seems to call OnNext before all of the messages in its buffer are parsed.
        // If you convert the IObservable<Message> it generates to an enumerable, it blocks
        // when you try to enumerate it. 

        // Convert each batch to an IEnumerable
        // ISSUE #2: Even if the following Rx query were to run asynchronously (it doesn't now, see the above comment),
        // it could still deliver messages out of order. Only, instead of delivering individual
        // messages out of order, the message batches themselves could arrive out of order.
        messageSource = from messageBatch in batchedMessages
                                        select messageBatch.ToEnumerable().ToList();
    }

Ответы [ 2 ]

3 голосов
/ 03 февраля 2011

Мой ответ, приведенный ниже, в некоторой степени основан на коде Enigmativity, но исправляет ряд условий гонки, связанных с завершением, а также добавляет поддержку отмены и пользовательских планировщиков (что значительно упростит модульное тестирование).

public static IObservable<U> Fork<T, U>(this IObservable<T> source,
    Func<T, U> selector)
{
    return source.Fork<T, U>(selector, Scheduler.TaskPool);
}

public static IObservable<U> Fork<T, U>(this IObservable<T> source,
    Func<T, U> selector, IScheduler scheduler)
{
    return Observable.CreateWithDisposable<U>(observer =>
    {
        var runningTasks = new CompositeDisposable();

        var lockGate = new object();
        var queue = new Queue<ForkTask<U>>();
        var completing = false;
        var subscription = new MutableDisposable();

        Action<Exception> onError = ex =>
        {
            lock(lockGate)
            {
                queue.Clear();
                observer.OnError(ex);
            }
        };

        Action dequeue = () =>
        {
            lock (lockGate)
            {
                var error = false;
                while (queue.Count > 0 && queue.Peek().Completed)
                {
                    var task = queue.Dequeue();
                    observer.OnNext(task.Value);
                }
                if (completing && queue.Count == 0)
                {
                    observer.OnCompleted();
                }
            }
        };

        Action onCompleted = () =>
        {
            lock (lockGate)
            {
                completing = true;
                dequeue();
            }
        };

        Action<T> enqueue = t =>
        {
            var cancellation = new MutableDisposable();
            var task = new ForkTask<U>();

            lock(lockGate)
            {
                runningTasks.Add(cancellation);
                queue.Enqueue(task);
            }

            cancellation.Disposable = scheduler.Schedule(() =>
            {
                try
                {
                    task.Value = selector(t);

                    lock(lockGate)
                    {
                        task.Completed = true;
                        runningTasks.Remove(cancellation);
                        dequeue();
                    }
                }
                catch(Exception ex)
                {
                    onError(ex);
                }
            });
        };

        return new CompositeDisposable(runningTasks, 
            source.AsObservable().Subscribe(
                t => { enqueue(t); },
                x => { onError(x); },
                () => { onCompleted(); }
            ));
    });
}

private class ForkTask<T>
{
    public T Value = default(T);
    public bool Completed = false;
}

Вот пример, который рандомизирует время выполнения задачи для его проверки:

AutoResetEvent are = new AutoResetEvent(false);

Random rand = new Random();

Observable.Range(0, 5)
    .Fork(i =>
    {
        int delay = rand.Next(50, 500);
        Thread.Sleep(delay);

        return i + 1;
    })
    .Subscribe(
        i => Console.WriteLine(i),
        () => are.Set()
    );

are.WaitOne();

Console.ReadLine();
2 голосов
/ 02 февраля 2011

Если у вас есть:

IObservable<string> UnparsedMessages = ...;
Func<string, Message> ParseMessage = ...;

Тогда вы можете использовать метод расширения SelectAsync, например:

IObservable<Message> ParsedMessages = UnparsedMessages.SelectAsync(ParseMessage);

Метод расширения SelectAsync обрабатывает каждое непарсированное сообщение асинхронно и обеспечивает возврат результатов в том порядке, в котором они были получены.

Дайте мне знать, если это делает то, что вам нужно.

Вот код:

public static IObservable<U> SelectAsync<T, U>(this IObservable<T> source,
    Func<T, U> selector)
{
    var subject = new Subject<U>();
    var queue = new Queue<System.Threading.Tasks.Task<U>>();
    var completing = false;
    var subscription = (IDisposable)null;

    Action<Exception> onError = ex =>
    {
        queue.Clear();
        subject.OnError(ex);
        subscription.Dispose();
    };

    Action dequeue = () =>
    {
        lock (queue)
        {
            var error = false;
            while (queue.Count > 0 && queue.Peek().IsCompleted)
            {
                var task = queue.Dequeue();
                if (task.Exception != null)
                {
                    error = true;
                    onError(task.Exception);
                    break;
                }
                else
                {
                    subject.OnNext(task.Result);
                }
            }
            if (!error && completing && queue.Count == 0)
            {
                subject.OnCompleted();
                subscription.Dispose();
            }
        }
    };

    Action<T> enqueue = t =>
    {
        if (!completing)
        {
            var task = new System.Threading.Tasks.Task<U>(() => selector(t));
            queue.Enqueue(task);
            task.ContinueWith(tu => dequeue());
            task.Start();
        }
    };

    subscription = source.Subscribe(
        t => { lock(queue) enqueue(t); },
        x => { lock(queue) onError(x); },
        () => { lock(queue) completing = true; });

    return subject.AsObservable();
}

В итоге мне пришлось вернуться к этой работе для работы, и я написал более надежную версию этого кода (основанную также на ответе Ричарда).

Ключевым преимуществом этого кода является отсутствие какой-либо явной очереди. Я просто использую продолжение задач, чтобы привести результаты в порядок. Работает как удовольствие!

public static IObservable<U> ForkSelect<T, U>(this IObservable<T> source, Func<T, U> selector)
{
    return source.ForkSelect<T, U>(t => Task<U>.Factory.StartNew(() => selector(t)));
}

public static IObservable<U> ForkSelect<T, U>(this IObservable<T> source, Func<T, Task<U>> selector)
{
    if (source == null) throw new ArgumentNullException("source");
    if (selector == null) throw new ArgumentNullException("selector");
    return Observable.CreateWithDisposable<U>(observer =>
    {
        var gate = new object();
        var onNextTask = Task.Factory.StartNew(() => { });
        var sourceCompleted = false;
        var taskErrored = false;

        Action<Exception> onError = ex =>
        {
            sourceCompleted = true;
            onNextTask = onNextTask.ContinueWith(t => observer.OnError(ex));
        };

        Action onCompleted = () =>
        {
            sourceCompleted = true;
            onNextTask = onNextTask.ContinueWith(t => observer.OnCompleted());
        };

        Action<T> onNext = t =>
        {
            var task = selector(t);
            onNextTask = Task.Factory.ContinueWhenAll(new[] { onNextTask, task }, ts =>
            {
                if (!taskErrored)
                {
                    if (task.IsFaulted)
                    {
                        taskErrored = true;
                        observer.OnError(task.Exception);
                    }
                    else
                    {
                        observer.OnNext(task.Result);
                    }
                }
            });
        };

        var subscription = source
            .AsObservable()
            .Subscribe(
                t => { if (!sourceCompleted) lock (gate) onNext(t); },
                ex => { if (!sourceCompleted) lock (gate) onError(ex); },
                () => { if (!sourceCompleted) lock (gate) onCompleted(); });

        var @return = new CompositeDisposable(subscription);

        return @return;
    });
}

И перегрузки SelectMany, позволяющие использовать LINQ:

public static IObservable<U> SelectMany<T, U>(this IObservable<T> source, Func<T, Task<U>> selector)
{
    return source.ForkSelect<T, U>(selector);
}

public static IObservable<V> SelectMany<T, U, V>(this IObservable<T> source, Func<T, Task<U>> taskSelector, Func<T, U, V> resultSelector)
{
    if (source == null) throw new ArgumentNullException("source");
    if (taskSelector == null) throw new ArgumentNullException("taskSelector");
    if (resultSelector == null) throw new ArgumentNullException("resultSelector");
    return source.Zip(source.ForkSelect<T, U>(taskSelector), (t, u) => resultSelector(t, u));
}

Таким образом, эти методы теперь можно использовать так:

var observableOfU = observableOfT.ForkSelect(funcOfT2U);

Или:

var observableOfU = observableOfT.ForkSelect(funcOfT2TaskOfU);

Или:

var observableOfU =
    from t in observableOfT
    from u in funcOfT2TaskOfU(t)
    select u;

Наслаждайтесь!

...