Sql Server Service Broker - PullRequest
       10

Sql Server Service Broker

4 голосов
/ 22 июня 2011

В настоящее время мы используем сервисный брокер для отправки сообщений туда и обратно, что работает нормально.Но мы хотели сгруппировать эти сообщения, используя RELATED_CONVERSATION_GROUP.Мы хотели использовать нашу собственную постоянную базу данных uuid как RELATED_CONVERSATION_GROUP = @uuid из нашей базы данных, но даже при том, что мы используем один и тот же uuid каждый раз, когда Conversion_group_id меняется каждый раз, когда мы получаем очередь.

Знаете ли вы, ребятачто не так с тем, как я создаю брокера или принимающий вызов, я предоставил ниже и код создания брокера, и код получения приема.Спасибо

ниже приведен код "Код создания компонента Service Broker"

CREATE PROCEDURE dbo.OnDataInserted

@EntityType NVARCHAR(100),
@MessageID BIGINT,
@uuid uniqueidentifier,
@message_body nvarchar(max)
AS

BEGIN

SET NOCOUNT ON;

 DECLARE @conversation UNIQUEIDENTIFIER

BEGIN DIALOG CONVERSATION @conversation
FROM SERVICE DataInsertSndService
TO SERVICE 'DataInsertRcvService'
ON CONTRACT DataInsertContract
WITH RELATED_CONVERSATION_GROUP = @uuid;

SEND ON CONVERSATION @conversation
MESSAGE TYPE DataInserted
(CAST(@message_body))

Ниже приведен код "Код получения"

WHILE 0 < @@TRANCOUNT ROLLBACK; SET NOCOUNT ON

BEGIN TRANSACTION;

DECLARE 
@cID as uniqueidentifier, 
@conversationHandle as uniqueidentifier,
@conversationGroupId as uniqueidentifier,
@tempConversationGroupId as uniqueidentifier,
@message_body VARBINARY(MAX)

RAISERROR ('Awaiting Message ...', 16, 1) WITH NOWAIT

;WAITFOR (RECEIVE TOP (1) 
@cID = Substring(CAST(message_body as nvarchar(max)),4,36), 
@conversationHandle = [conversation_handle],
@conversationGroupId = [conversation_group_id],
@message_body = message_body
FROM DataInsertRcvQueue)

RAISERROR ('Message Received', 16, 1) WITH NOWAIT
Select @tempConversationGroupId = conversationGroupID from ConversationGroupMapper where cID = @cID; 
declare @temp as nvarchar(max);
Set @temp = CAST(@tempConversationGroupId as nvarchar(max));
if @temp  <> ''
BEGIN
    MOVE CONVERSATION @conversationHandle TO @tempConversationGroupId;

RAISERROR ('Moved to Existing Conversation Group' , 16, 1) WITH NOWAIT
END
    else
BEGIN
insert into ConversationGroupMapper values (@cID,@conversationGroupId);

RAISERROR ('New Conversation Group' , 16, 1) WITH NOWAIT
END

WAITFOR DELAY '000:00:10'

COMMIT

RAISERROR ('Committed' , 16, 1) WITH NOWAIT

Разработка

Наша ситуация такова, что нам нужно получать элементы из этой очереди компонента Service Broker в цикле, блокировать их в WAITFOR и передавать их в другую систему по ненадежной сети.Элементы, полученные из очереди, предназначены для одного из множества подключений к этой удаленной системе.Если элемент не был успешно доставлен в другую систему, транзакция для этого отдельного элемента должна быть отменена, и элемент будет возвращен в очередь.Мы фиксируем транзакцию при успешной доставке, разблокируя последовательность сообщений, которая будет получена при последующей итерации цикла.

Задержки в последовательности связанных элементов не должны влиять на доставку несвязанных последовательностей.Отдельные элементы отправляются в очередь, как только они становятся доступными, и отправляются немедленно.Элементы должны пересылаться в одном файле, хотя порядок доставки даже в пределах последовательности не является строго важным.

Из цикла, который получает одно сообщение за раз, новый или существующий TcpClient выбирается из нашего списка открытыхсоединения, а также сообщение и открытое соединение передаются по цепочке асинхронных обратных вызовов ввода-вывода до завершения передачи.Затем мы завершаем транзакцию БД, в которой мы получили Элемент из очереди компонента Service Broker.

Как можно использовать компонент Service Broker и группы диалога для помощи в этом сценарии?

1 Ответ

8 голосов
/ 23 июня 2011

Группы разговоров представляют собой концепцию local , используемую исключительно для блокировки: коррелированные разговоры принадлежат группе, так что, пока вы обрабатываете сообщение в одном разговоре, другой поток не может обработать коррелированное сообщение. Нет информации о группах разговоров, которыми обмениваются две конечные точки, поэтому в вашем примере все конечные точки инициатора в конечном итоге принадлежат одной группе разговоров, но каждая целевая конечная точка - это отдельная группа разговоров (каждая группа имеет только один разговор). Причина, по которой система ведет себя так, заключается в том, что группы разговоров предназначены для решения такой проблемы, как, например, служба бронирования поездки: когда она получает сообщение «забронировать поездку», она должна зарезервировать рейс, гостиницу и автомобиль. аренда. Он должен отправить три сообщения, по одному на каждую из этих служб («рейсы», «отели», «автомобили»), а затем ответы будут возвращаться асинхронно. Когда они возвращаются, обработка должна гарантировать, что они не обрабатываются одновременно отдельными потоками, каждый из которых будет пытаться обновить статус записи 'trip'. В сообщениях эта проблема известна как «проблема корреляции сообщений».

Однако часто группы разговоров развертываются в SSB исключительно из соображений производительности: они допускают более высокие результаты RECEIVE. Целевые конечные точки можно объединить в группу с помощью MOVE CONVERSATION, но на практике существует гораздо более простой прием: изменить направление разговора. Пусть ваш пункт назначения начнет диалоги (сгруппированные), а источник отправит свои "обновления" в диалог (ы), начатый пунктом назначения.

Некоторые заметки:

  • Не используйте паттерн огня и забывания BEGIN / SEND / END. Вы делаете невозможным диагностировать какие-либо проблемы в будущем, см. Огонь и забудьте: хорошо для военных, но не для разговоров Service Broker .
  • Никогда не используйте WITH CLEANUP в рабочем коде. Он предназначен для крайних административных мер, таких как аварийное восстановление. Если вы злоупотребляете ею, вы лишаете SSB шансов правильно отследить сообщение для правильной повторной доставки (если сообщение по какой-то причине подпрыгивает на цели, оно будет потеряно навсегда).
  • SSB не гарантирует порядок между разговорами, только в пределах одного разговора. Начало нового диалога для каждого события INSERT не гарантирует сохранения в целевом порядке порядка операций вставки.
...