Как я могу обрабатывать сообщения асинхронно, выбрасывая новые сообщения во время обработки? - PullRequest
3 голосов
/ 02 марта 2010

У меня есть приложение на C #, которое подписывается на тему в нашей системе обмена сообщениями для обновления значений. Когда приходит новое значение, я делаю некоторую обработку и затем продолжаю. Проблема в том, что обновления могут поступать быстрее, чем приложение может их обработать. То, что я хочу сделать, это просто удерживать последнее значение, поэтому я не хочу очереди. Например, источник публикует значение «1», и мое приложение получает его; во время обработки источник публикует последовательность (2, 3, 4, 5) до того, как мое приложение завершит обработку; мое приложение обрабатывает значение "5", а предыдущие значения отбрасываются.

Трудно опубликовать пример рабочего кода, так как он основан на проприетарных библиотеках обмена сообщениями, но я думаю, что это общий шаблон, я просто не могу понять, как он называется ... Это похоже на функцию обработки должен выполняться в отдельном потоке, чем обратный вызов обмена сообщениями, но я не уверен, как это организовать, например как этот поток уведомляется об изменении значения. Какие-нибудь общие советы о том, что мне нужно делать?

Ответы [ 5 ]

2 голосов
/ 02 марта 2010

Очень простым способом может быть что-то вроде:

private IMessage _next;

public void ReceiveMessage(IMessage message)
{
    Interlocked.Exchange(ref _next, message);
}

public void Process()
{
    IMessage next = Interlocked.Exchange(ref _next, null);

    if (next != null)
    {
        //...
    }
}
1 голос
/ 02 марта 2010

Вообще говоря, для предотвращения потери сообщений используется система обмена сообщениями. Моей первоначальной реакцией на решение будет поток для получения входящих данных, который пытается передать их в ваш поток обработки, если поток обработки уже запущен, вы отбрасываете данные и ждете следующего элемента и повторяете.

0 голосов
/ 02 марта 2010

Простой способ - использовать переменную-член для хранения последнего полученного значения и обернуть его блокировкой. Другой способ - поместить входящие значения в стек. Когда вы будете готовы к новому значению, вызовите Stack.Pop (), а затем Stack.Clear ():

public static class Incoming
{
    private static object locker = new object();
    private static object lastMessage = null;

    public static object GetMessage()
    {
        lock (locker)
        {
            object tempMessage = lastMessage;
            lastMessage = null;
            return tempMessage;
        }
    }
    public static void SetMessage(object messageArg)
    {
        lock (locker)
        {
            lastMessage = messageArg;
        }
    }

    private static Stack<object> messageStack = new Stack<object>();
    public static object GetMessageStack()
    {
        lock (locker)
        {
            object tempMessage = messageStack.Count > 0 ? messageStack.Pop() : null;
            messageStack.Clear();
            return tempMessage;
        }
    }
    public static void SetMessageStack(object messageArg)
    {
        lock (locker)
        {
            messageStack.Push(messageArg);
        }
    }
}

Хорошая идея - поместить функции обработки в отдельный поток. Либо используйте метод обратного вызова из потока обработки, чтобы сообщить, что он готов к другому сообщению, либо сделайте так, чтобы он сигнализировал о том, что оно сделано, и затем попросите основной поток запустить новый поток процессора при получении сообщения (через вышеуказанное SetMessage ...) .

0 голосов
/ 02 марта 2010

Очевидно, что дизайн библиотеки сообщений может повлиять на лучший способ решения этой проблемы. Как я делал это в прошлом с несколько похожими функциональными библиотеками: у меня есть поток, который прослушивает события и помещает их в очередь, а затем у меня есть работники Threadpool, которые удаляют сообщения и обрабатывают их.

Вы можете прочитать о многопоточных асинхронных очередях заданий:

Многопоточная очередь заданий

Потоки очереди работ

0 голосов
/ 02 марта 2010

Это не «шаблон», но вы можете использовать общую структуру данных для хранения значения. Если из библиотеки сообщений получено только одно значение, тогда подойдет простой объект. В противном случае вы можете использовать хеш-таблицу для хранения нескольких значений сообщения (если требуется).

Например, в потоке получения сообщений: когда приходит сообщение, добавьте / обновите структуру данных с ее значением. Со стороны потока, вы можете периодически проверять эту структуру данных, чтобы убедиться, что у вас все еще есть то же значение. Если вы этого не сделаете, откажитесь от любой обработки, которую вы уже сделали, и повторно обработайте ее с новым значением.

Конечно, вам необходимо убедиться, что структура данных правильно синхронизирована между потоками.

...