Как написать транзакционный многопоточный сервис WCF, использующий MSMQ - PullRequest
2 голосов
/ 30 мая 2009

У меня есть служба WCF, которая отправляет сообщения в личную нетранзакционную очередь MSMQ. У меня есть другой сервис WCF (многопоточный), который обрабатывает сообщения MSMQ и вставляет их в базу данных.

Моя проблема связана с секвенированием. Я хочу, чтобы сообщения были в определенном порядке. Например, MSG-A необходимо перейти в базу данных перед вставкой MSG-B. Так что мое текущее решение для этого является очень грубым и дорогим с точки зрения базы данных.

Я читаю сообщение, если его MSG-B и в базе данных нет MSG-A, я возвращаю его обратно в очередь сообщений и продолжаю делать это, пока MSG-A не будет вставлен в базу данных. Но это очень дорогая операция, так как включает сканирование таблицы (SELECT stmt).

Сообщения всегда публикуются в очереди последовательно.

Если не считать моей службы обработки очередей WCF однопоточной (если для атрибута поведения службы InstanceContextMode установлено значение Single), может кто-нибудь предложить лучшее решение?

Спасибо

Dan

Ответы [ 2 ]

1 голос
/ 09 июля 2009

Если вы являетесь единственным клиентом из этих очередей, вы можете очень просто добавить метку времени в качестве заголовка сообщения (см. IDesign образец) и сохранить поле «Отправлено» (вроде сообщения Outlook) в база данных также. Вы можете обработать их в том порядке, в котором они были отправлены (в основном вы перемещаете логику сортировки во время потребления).

Надеюсь, это поможет, Адриан

1 голос
/ 30 мая 2009

Вместо того, чтобы немедленно отправлять сообщения в БД после их удаления из очереди, сохраняйте список ожидающих сообщений в памяти. Когда вы получите A или B, проверьте, есть ли соответствующий в списке. Если это так, отправьте их оба (в правильном порядке) в базу данных и удалите соответствующий из списка. В противном случае просто добавьте новое сообщение в этот список.

Если проверка на совпадение является слишком дорогой задачей для сериализации - я полагаю, вы многопоточны по причине - у вас может быть другой поток, обрабатывающий список. Существующие несколько потоков прочитают, немедленно отправят большинство сообщений в БД, но отложат As и B в списке (threadsafe). Фоновый поток очищает этот список, находя совпадающие As и B, и, когда он находит их, он отправляет их в правильном порядке (и удаляет их из списка).

Суть в том, что, поскольку вы удаляете элементы из очереди с несколькими потоками, вам придется где-то сериализоваться, чтобы обеспечить упорядочение. Хитрость заключается в том, чтобы свести к минимуму количество и длительность времени, затрачиваемого на блокировку последовательного кода.

Также может быть что-то, что вы могли бы сделать на уровне базы данных, с помощью триггеров или чего-то другого, чтобы изменить порядок записей при обнаружении этой ситуации. Боюсь, я не знаю достаточно о программировании БД, чтобы помочь там.

ОБНОВЛЕНИЕ: Предполагая, что сообщения содержат некоторый идентификатор, который позволяет связать сообщение «А» с правильным ассоциированным сообщением «В», следующий код гарантирует, что А попадет в базу данных до Б. Обратите внимание, что он не гарантирует они являются смежными записями в базе данных - между A и B могут быть другие сообщения. Кроме того, если по какой-либо причине вы получаете A или B, не получая соответствующего сообщения другого типа, этот код будет пропускать память, поскольку он зависает на непревзойденное сообщение навсегда.

(Вы можете извлечь эти два «блокированных» блока в одну подпрограмму, но я оставлю это так для ясности в отношении A и B.)

static private object dictionaryLock = new object();
static private Dictionary<int, MyMessage> receivedA = 
    new Dictionary<int, MyMessage>();
static private Dictionary<int, MyMessage> receivedB = 
    new Dictionary<int, MyMessage>();

public void MessageHandler(MyMessage message)
{
    MyMessage matchingMessage = null;
    if (IsA(message))
    {
        InsertIntoDB(message);
        lock (dictionaryLock)
        {
            if (receivedB.TryGetValue(message.id, out matchingMessage))
            {
                receivedB.Remove(message.id);
            }
            else
            {
                receivedA.Add(message.id, message);
            }
        }
        if (matchingMessage != null)
        {
            InsertIntoDB(matchingMessage);
        }
    }
    else if (IsB(message))
    {
        lock (dictionaryLock)
        {
            if (receivedA.TryGetValue(message.id, out matchingMessage))
            {
                receivedA.Remove(message.id);
            }
            else
            {
                receivedB.Add(message.id, message);
            }
        }
        if (matchingMessage != null)
        {
            InsertIntoDB(message);
        }
    }
    else
    {
        // not A or B, do whatever
    }
}
...