Я использую сервисный брокер в качестве системы обмена сообщениями для планирования и запуска заданий. Задание Eash состоит из нескольких задач или этапов, называемых движками.
Моими объектами брокера служб являются:
- Типы сообщений: SubmitJob, JobResponse, SubmitTask, TaskResponse
- Контракты: JobContract, TaskContract
- Очереди: ClientQueue, JobQueue, EngineQueue, ExternalActivatorQueue
- Услуги: ClientService, JobService, EngineService, ExternalActivatorService
- Уведомление о событии: EventNotificationEngineQueue
У меня есть внутренняя активация (сохраненная процедура) в очереди заданий. Для SubmitJob MessageTypes сохраненный процесс получает первую задачу для этого задания, запускает диалог с EngineService и отправляет сообщение в эту очередь (StartTask). Для TaskResponses MessageType я проверяю, есть ли еще задачи для этой работы, если есть Затем они отправляются в EngineQueue, если нет, то задача для этого задания завершена (отправить сообщение и очистить.)
Кажется, все работает отлично. Однако я хочу иметь внешнее приложение (движок), которое будет обрабатывать сообщения EngineQueue. Поэтому я использую внешний механизм активации Microsoft (ssbeas.exe). Это заняло много времени, но я наконец-то заставил его работать. Сообщение поступает в EngineQueue, EventNotificationEngineQueue запускает мое приложение и опустошает очередь. Все идет нормально. Тем не менее, мое приложение работает несколько раз. Мое тестовое приложение настроено на отправку электронной почты после его завершения. Несмотря на то, что я отправляю только одну работу с одной задачей, я получаю несколько электронных писем (указывая, что программа запускалась несколько раз.)
Вот код моего приложения (vb.net) (брокер - это объект, который инкапсулирует сервисы сервисного брокера (отправка, получение и т. Д.):
While True
oBroker.tran = oBroker.cnn.BeginTransaction
oBroker.Receive("EMGQueue", msgType, msg, serviceInstance, dialogHandle)
If dialogHandle = System.Guid.Empty Then
'Console.WriteLine("An Error Occurred. Program Terminated.")
oBroker.tran.Commit()
Exit While
End If
ConsoleWriteLine("Received: " & msgType)
If (msg Is Nothing) Then
ConsoleWriteLine("commiting and exiting")
oBroker.tran.Commit()
Exit While
Else
Select Case (msgType)
Case "SubmitTask"
ProcessMsg(oBroker.cnn, oBroker.tran, msgType, msg, iTaskID, iTaskKey)
oBroker.Send(dialogHandle, "<TaskStatus>1</TaskStatus>'")
Case "http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog"
oBroker.EndDialog(dialogHandle)
Case "http://schemas.microsoft.com/SQL/ServiceBroker/Error"""
oBroker.EndDialog(dialogHandle)
End Select
End If
ConsoleWriteLine("commiting...")
oBroker.tran.Commit()
End While
Я не понимаю, почему приложение запускается несколько раз, но кроме этого, я не понимаю, почему последующие версии все еще могут видеть сообщение в очереди. Ведь первое воплощение должно было заблокировать сообщение в очереди. Он блокирует очередь, потому что я смог протестировать с помощью диспетчера запросов, чтобы попытаться получить сообщение, когда мое приложение работало, и оно было заблокировано.
Я попытался поиграть со значениями параллелизма в EAService.config. Когда я установил его
min = "0" и max = "1" Я уменьшил количество раз, которое приложение, по-видимому, работало до двух. Ранее, используя min = "0" и max = "10", оно показывалось как 18 копий.
Надеюсь, это имело смысл, и извините за длину. У кого-нибудь есть идеи, что здесь происходит? Я сделал ошибку в своем кодировке приложения .net?
Спасибо
Martin
Редактировать: добавление журнала, созданного после запуска Engine:
2010-02-08 09:31:39 - Main
2010-02-08 09:31:39 - Получено: SubmitTask
2010-02-08 09:31:39 - ProcessMsg
2010-02-08 09:31:39 - <Task><TaskID> 5</TaskID><TaskKey>2</TaskKey></Task>
2010-02-08 09:31:39 - DoWork
2010-02-08 09:31:39 - отправка письма
2010-02-08 09:31:40 - совершение ...
2010-02-08 09:31:40 - Спящий
2010-02-08 09:32:10 - Спящий сон завершен.
2010-02-08 09:32:10 - Основное завершение
2010-02-08 09:32:10 - Внешнее активированное приложение успешно завершается и завершает работу.
2010-02-08 09:32:10 - Main
2010-02-08 09:32:10 - Получено: SubmitTask
2010-02-08 09:32:10 - ProcessMsg
2010-02-08 09:32:10 - <Task><TaskID> 5</TaskID><TaskKey> 2</TaskKey></Task>
2010-02-08 09:32:10 - DoWork
2010-02-08 09:32:10 - отправка письма
2010-02-08 09:32:10 - коммит ...
2010-02-08 09:32:10 - Спящий
2010-02-08 09:32:40 - Спящий сон завершен.
2010-02-08 09:32:40 - Основное завершение
2010-02-08 09:32:40 - Внешнее активированное приложение завершается успешно и завершается.
Вы можете видеть, что оно проходит через все приложение дважды (основное, полученное, dowork, sendemail, полное.)
Редактировать 2: вот последнее воплощение хранимой процедуры (операторы отладки и все), которая активируется при отправке задания в очередь:
ALTER PROCEDURE [dbo].[pr_ProcessJob] AS BEGIN
DECLARE @message_type_name sysname
DECLARE @dialog uniqueidentifier
DECLARE @message_sequence_number bigint
DECLARE @error_message_sequence_number bigint
DECLARE @message_body xml
DECLARE @cgid uniqueidentifier
DECLARE @JobID int
DECLARE @Params varchar(MAX)
DECLARE @ErrorNumber bigint
DECLARE @ErrorText nvarchar(MAX)
DECLARE @TaskID int
DECLARE @TaskService varchar(100)
DECLARE @TaskKey int
DECLARE @chEngine uniqueidentifier
DECLARE @Step int
DECLARE @NextStep int
DECLARE @jobch uniqueidentifier
DECLARE @EngineMsg XML
DECLARE @TimeStarted datetime
DECLARE @TaskStatus int
-- This procedure will just sit in a loop processing Task messages in the queue
-- until the queue is empty
SET NOCOUNT ON
SET @error_message_sequence_number = -100
PRINT 'pr_ProcessJob: Start'
WHILE (1=1) BEGIN
BEGIN TRY
PRINT 'pr_ProcessJob: BEGIN TRANSACTION'
BEGIN TRANSACTION
-- first lets get the conversation group id for the next message.
WAITFOR (
GET CONVERSATION GROUP @cgid FROM [JobQueue]
), TIMEOUT 1000
IF (@@ROWCOUNT = 0) BEGIN
PRINT 'pr_ProcessJob: ROLLBACK TRANSACTION (GET CONVERSATION)'
ROLLBACK TRANSACTION
BREAK
END
PRINT @CGID
-- Inner Loop (Message Processing)
WHILE (1=1) BEGIN
-- Receive the next available message
PRINT 'Receiving Message.'
WAITFOR (
RECEIVE top(1) -- just handle one message at a time
@message_type_name=message_type_name, --the type of message received
@message_body=CAST(message_body AS XML), -- the message contents
@message_sequence_number=message_sequence_number,
@dialog = conversation_handle -- the identifier of the dialog this message was received on
FROM [JobQueue]
WHERE conversation_group_id=@cgid
), TIMEOUT 3000 -- if the queue is empty for three seconds, give up and go away
-- If we didn't get anything, the queue is empty so bail out
IF (@@ROWCOUNT = 0) BEGIN
PRINT 'pr_ProcessJob::WaitFor - No messages for conversation group bailing out'
BREAK
END --IF (@@ROWCOUNT = 0)
PRINT 'Message Received: ' + @message_type_name
SAVE TRANSACTION MessageReceivedSavePoint
-- Handle the End Conversation Message
IF (@message_type_name = 'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog') BEGIN
-- When we receive an End Dialog, we need to end also.
PRINT 'ENDING CONVERSATION'
END CONVERSATION @dialog
END -- IF (@message_type_name = 'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog') BEGIN
ELSE BEGIN
-- Handle the Conversation Error Message
IF (@message_type_name = 'http://schemas.microsoft.com/SQL/ServiceBroker/Error') BEGIN
-- We can't return anything here because the dialog at the other end is closed so just log
-- an error and close our end of the conversation.
PRINT 'ENDING CONVERSATION w/Error'
END CONVERSATION @dialog
END -- (@message_type_name = 'http://schemas.microsoft.com/SQL/ServiceBroker/Error')
ELSE BEGIN
IF (@message_type_name = 'SubmitJob') BEGIN -- Process normal Job messages..
PRINT 'pr_ProcessJob:: Message Type SubmitJob received.'
-- Pull the information out of the task message with XQuery
SELECT @JobID = @message_body.value('(/Job/JobID)[1]', 'int'),
@Params = @message_body.value('(/Job/Params)[1]', 'varchar(MAX)')
PRINT 'pr_ProcessJob::@JobID = ' + cast(@jobID as varchar(10))
PRINT 'pr_ProcessJob::@Params = ' + @Params
SELECT @ErrorNumber = 0, @ErrorText = N''
-- Do something with the job
-- save state
-- we are looking for the first step
SET @Step=1
PRINT 'Selecting from JobTask'
---------------------------------------------------------
-- Get the next task
---------------------------------------------------------
SELECT TOP 1
@TaskID=task.TaskID,
@TaskService=tt.TaskService,
@TaskKey =Task.TaskKey
FROM JobTask task INNER JOIN TaskType tt
ON task.TaskTypeID = tt.TaskTypeID
WHERE task.jobID=@JobID AND task.enabled=1 and task.step>=@step
ORDER BY Task.step
---------------------------------------------------------
PRINT 'Selecting from JobTask: complete'
PRINT 'Step='+cast(@step as varchar(max))
PRINT 'TaskID='+cast(@TaskID as varchar(max))
PRINT 'TaskService='+cast(@TaskService as varchar(max))
PRINT'TaskKey='+cast(@TaskKey as varchar(max))
PRINT 'BEGIN DIALOG with ' + @TaskService
BEGIN DIALOG @chEngine
FROM SERVICE [JobService]
TO SERVICE @TaskService
ON CONTRACT [TaskContract]
WITH RELATED_CONVERSATION=@dialog;
PRINT 'BEGIN DIALOG with ' + @TaskService+' completed.'
SET @EngineMsg = CAST('<Task><TaskID>'+ str(@TaskID)+'</TaskID><TaskKey>'+ str(@Taskkey)+'</TaskKey></Task>' as XML);
PRINT CAST(@EngineMsg as varchar(max))
PRINT 'Sending Message Type SubmitTask to Engine.';
SEND ON CONVERSATION @chEngine
MESSAGE TYPE SubmitTask
(@EngineMsg)
PRINT 'Inserting into jobstate'
INSERT INTO JobState(cgid, jobch, jobID, step) VALUES(@cgid, @dialog, @jobid, @step)
END -- IF (@message_type_name = 'SubmitJob')
ELSE BEGIN
IF (@message_type_name = 'TaskResponse') BEGIN
PRINT 'Processing MessageType TaskResponse'
SELECT @TaskStatus = @message_body.value('(/TaskStatus)[1]', 'int')
PRINT 'pr_ProcessJob::@TaskStatus = ' + cast(@TaskStatus as varchar(10))
PRINT 'Loading State'
--LoadState
SELECT @JobID=jobid,
@Step=Step,
@jobch=jobch,
@TimeStarted=sysdate
FROM Jobstate
WHERE cgid=@cgid
PRINT 'Loading State complete'
PRINT @jobch
PRINT 'Selecting from JobTask'
---------------------------------------------------------
-- Get the next task
---------------------------------------------------------
SELECT TOP 1
@TaskID=task.TaskID,
@TaskService=tt.TaskService,
@TaskKey =task.TaskKey,
@NextStep = task.Step
FROM JobTask task INNER JOIN TaskType tt
ON task.TaskTypeID = tt.TaskTypeID
WHERE task.jobID=@JobID AND task.enabled=1 and task.step>@step
ORDER BY Task.step
---------------------------------------------------------
PRINT 'Selecting from JobTask: complete'
PRINT 'NextTask: ['+@TaskService+']'
if (@TaskService is null) BEGIN
PRINT '@TaskService is NULL: BEGIN'
-- no more tasks
--END CONVERSATION @jobch
PRINT 'Removing from state table'
DELETE FROM JobState
WHERE @cgid=cgid
PRINT @@ROWCOUNT
PRINT 'Removing from state table-completed'
DECLARE @ResponseDoc xml
-- Send a response message saying we're done
DECLARE @Time nvarchar(100)
SET @Time = cast(getdate() as nvarchar(100))
DECLARE @TimeStartedText nvarchar(100)
SET @TimeStartedText = cast(@TimeStarted as nvarchar(100))
SET @ResponseDoc = N'<Job/>'
SET @ResponseDoc.modify(
'insert (<JobID>{ sql:variable("@JobID") }</JobID>,
<JobStatus>{ sql:variable("@ErrorNumber") }</JobStatus>,
<ErrorNumber>{ sql:variable("@ErrorNumber") }</ErrorNumber>,
<ErrorText>{ sql:variable("@ErrorText") }</ErrorText>,
<TimeStarted>{ sql:variable("@TimeStartedText") }</TimeStarted>,
<TimeCompleted>{ sql:variable("@Time") }</TimeCompleted>)
as last into /Job [1]');
SEND ON CONVERSATION @jobch
MESSAGE TYPE [JobResponse] (@ResponseDoc)
END CONVERSATION @jobch
PRINT '@TaskService is NULL: END'
END --if (@TaskService is null) BEGIN
ELSE BEGIN
-- there are more tasks
PRINT '@TaskService is not null: BEGIN'
PRINT 'BEGIN DIALOG with ' + @TaskService
--another task
BEGIN DIALOG @chEngine
FROM SERVICE [JobService]
TO SERVICE @TaskService
ON CONTRACT [TaskContract]
WITH RELATED_CONVERSATION=@dialog;
SET @EngineMsg = CAST('<Task><TaskID>'+ str(@TaskID)+'</TaskID><TaskKey>'+ str(@Taskkey)+'</TaskKey></Task>' as XML);
PRINT 'SEND ' +cast(@EngineMsg as varchar(max));
SEND ON CONVERSATION @chEngine
MESSAGE TYPE SubmitTask (@EngineMsg)
PRINT 'SAVING State: ' +str(@step)
-- save state
Update JobState
SET step = @NextStep
FROM JobState
WHERE cgid=@cgid
PRINT '@TaskService is not null: END'
END -- ELSE (@TaskService is NOT NULL)
PRINT 'Processing MessageType TaskResponse...Complete'
END -- IF (@message_type_name = 'TaskCompleted')
END -- ELSE IF (@message_type_name <> 'JobRequest')
END -- ELSE (@message_type_name <> 'http://schemas.microsoft.com/SQL/ServiceBroker/Error')
END -- ELSE (@message_type_name <> 'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog')
END -- WHILE (1=1) BEGIN
PRINT 'COMMIT TRANSACTION'
COMMIT TRANSACTION
END TRY
BEGIN CATCH
--rollback transaction
DECLARE @ErrNum int
DECLARE @ErrMsg varchar(max)
SELECT
ERROR_NUMBER() AS ErrorNumber
,ERROR_SEVERITY() AS ErrorSeverity
,ERROR_STATE() AS ErrorState
,ERROR_PROCEDURE() AS ErrorProcedure
,ERROR_LINE() AS ErrorLine
,ERROR_MESSAGE() AS ErrorMessage;
PRINT 'pr_ProcessJob: ROLLBACK (CATCH)'
if (error_number()=1205) BEGIN
-- a deadlock occurred. We can try it again.
PRINT 'pr_ProcessJob: ROLLBACK TRANSACTION (CATCH)'
ROLLBACK TRANSACTION
--CONTINUE
END --if (error_number()=1205)
ELSE BEGIN
if (error_number()=9617) BEGIN
PRINT 'pr_ProcessJob: ROLLBACK TRANSACTION (CATCH)'
ROLLBACK TRANSACTION
END
ELSE BEGIN -- (error_number()<>9617)
-- another error occurred. The message cant be procesed sucessfully
PRINT 'pr_ProcessJob: ROLLBACK TRANSACTION to MessageReceivedSavePoint (CATCH)'
ROLLBACK TRANSACTION MessageReceivedSavePoint
END --ELSE (error_number()<>9617)
END -- if (error_number()<>1205)
END CATCH
END -- while loop
PRINT 'pr_ProcessJob: Complete'
END -- CREATE PROCEDURE [dbo].[ProcessJobProc]