Внешняя активация компонента Service Broker запускает приложение несколько раз - PullRequest
1 голос
/ 08 февраля 2010

Я использую сервисный брокер в качестве системы обмена сообщениями для планирования и запуска заданий. Задание Eash состоит из нескольких задач или этапов, называемых движками.

Моими объектами брокера служб являются:

  • Типы сообщений: SubmitJob, JobResponse, SubmitTask, TaskResponse
  • Контракты: JobContract, TaskContract
  • Очереди: ClientQueue, JobQueue, EngineQueue, ExternalActivatorQueue
  • Услуги: ClientService, JobService, EngineService, ExternalActivatorService
  • Уведомление о событии: EventNotificationEngineQueue

У меня есть внутренняя активация (сохраненная процедура) в очереди заданий. Для SubmitJob MessageTypes сохраненный процесс получает первую задачу для этого задания, запускает диалог с EngineService и отправляет сообщение в эту очередь (StartTask). Для TaskResponses MessageType я проверяю, есть ли еще задачи для этой работы, если есть Затем они отправляются в EngineQueue, если нет, то задача для этого задания завершена (отправить сообщение и очистить.)

Кажется, все работает отлично. Однако я хочу иметь внешнее приложение (движок), которое будет обрабатывать сообщения EngineQueue. Поэтому я использую внешний механизм активации Microsoft (ssbeas.exe). Это заняло много времени, но я наконец-то заставил его работать. Сообщение поступает в EngineQueue, EventNotificationEngineQueue запускает мое приложение и опустошает очередь. Все идет нормально. Тем не менее, мое приложение работает несколько раз. Мое тестовое приложение настроено на отправку электронной почты после его завершения. Несмотря на то, что я отправляю только одну работу с одной задачей, я получаю несколько электронных писем (указывая, что программа запускалась несколько раз.)

Вот код моего приложения (vb.net) (брокер - это объект, который инкапсулирует сервисы сервисного брокера (отправка, получение и т. Д.):

While True

            oBroker.tran = oBroker.cnn.BeginTransaction

            oBroker.Receive("EMGQueue", msgType, msg, serviceInstance, dialogHandle)

            If dialogHandle = System.Guid.Empty Then
                'Console.WriteLine("An Error Occurred. Program Terminated.")
                oBroker.tran.Commit()
                Exit While
            End If

            ConsoleWriteLine("Received: " & msgType)

            If (msg Is Nothing) Then
                ConsoleWriteLine("commiting and exiting")
                oBroker.tran.Commit()
                Exit While

            Else
                Select Case (msgType)
                    Case "SubmitTask"
                        ProcessMsg(oBroker.cnn, oBroker.tran, msgType, msg, iTaskID, iTaskKey)
                        oBroker.Send(dialogHandle, "<TaskStatus>1</TaskStatus>'")

                    Case "http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog"
                        oBroker.EndDialog(dialogHandle)

                    Case "http://schemas.microsoft.com/SQL/ServiceBroker/Error"""
                        oBroker.EndDialog(dialogHandle)

                End Select
            End If


            ConsoleWriteLine("commiting...")
            oBroker.tran.Commit()

        End While

Я не понимаю, почему приложение запускается несколько раз, но кроме этого, я не понимаю, почему последующие версии все еще могут видеть сообщение в очереди. Ведь первое воплощение должно было заблокировать сообщение в очереди. Он блокирует очередь, потому что я смог протестировать с помощью диспетчера запросов, чтобы попытаться получить сообщение, когда мое приложение работало, и оно было заблокировано.

Я попытался поиграть со значениями параллелизма в EAService.config. Когда я установил его min = "0" и max = "1" Я уменьшил количество раз, которое приложение, по-видимому, работало до двух. Ранее, используя min = "0" и max = "10", оно показывалось как 18 копий.

Надеюсь, это имело смысл, и извините за длину. У кого-нибудь есть идеи, что здесь происходит? Я сделал ошибку в своем кодировке приложения .net?

Спасибо Martin

Редактировать: добавление журнала, созданного после запуска Engine:

2010-02-08 09:31:39 - Main
2010-02-08 09:31:39 - Получено: SubmitTask
2010-02-08 09:31:39 - ProcessMsg
2010-02-08 09:31:39 - <Task><TaskID> 5</TaskID><TaskKey>2</TaskKey></Task>
2010-02-08 09:31:39 - DoWork
2010-02-08 09:31:39 - отправка письма
2010-02-08 09:31:40 - совершение ...
2010-02-08 09:31:40 - Спящий
2010-02-08 09:32:10 - Спящий сон завершен.
2010-02-08 09:32:10 - Основное завершение
2010-02-08 09:32:10 - Внешнее активированное приложение успешно завершается и завершает работу.
2010-02-08 09:32:10 - Main
2010-02-08 09:32:10 - Получено: SubmitTask
2010-02-08 09:32:10 - ProcessMsg
2010-02-08 09:32:10 - <Task><TaskID> 5</TaskID><TaskKey> 2</TaskKey></Task>
2010-02-08 09:32:10 - DoWork
2010-02-08 09:32:10 - отправка письма
2010-02-08 09:32:10 - коммит ...
2010-02-08 09:32:10 - Спящий
2010-02-08 09:32:40 - Спящий сон завершен.
2010-02-08 09:32:40 - Основное завершение
2010-02-08 09:32:40 - Внешнее активированное приложение завершается успешно и завершается.

Вы можете видеть, что оно проходит через все приложение дважды (основное, полученное, dowork, sendemail, полное.)

Редактировать 2: вот последнее воплощение хранимой процедуры (операторы отладки и все), которая активируется при отправке задания в очередь:

ALTER PROCEDURE [dbo].[pr_ProcessJob] AS BEGIN

    DECLARE     @message_type_name      sysname
    DECLARE     @dialog             uniqueidentifier 
    DECLARE     @message_sequence_number    bigint
    DECLARE     @error_message_sequence_number  bigint
    DECLARE     @message_body           xml
    DECLARE     @cgid               uniqueidentifier
    DECLARE     @JobID              int
    DECLARE     @Params             varchar(MAX)

    DECLARE     @ErrorNumber            bigint
    DECLARE     @ErrorText          nvarchar(MAX)

    DECLARE         @TaskID             int 
    DECLARE     @TaskService            varchar(100)
    DECLARE     @TaskKey            int
    DECLARE     @chEngine           uniqueidentifier
    DECLARE     @Step               int
    DECLARE     @NextStep           int
    DECLARE     @jobch              uniqueidentifier
    DECLARE     @EngineMsg          XML
    DECLARE     @TimeStarted            datetime
    DECLARE     @TaskStatus         int

    --  This procedure will just sit in a loop processing Task messages in the queue
    --  until the queue is empty
    SET NOCOUNT ON
    SET @error_message_sequence_number = -100


    PRINT 'pr_ProcessJob: Start'

    WHILE (1=1) BEGIN

        BEGIN TRY

            PRINT 'pr_ProcessJob: BEGIN TRANSACTION'
            BEGIN TRANSACTION

            -- first lets get the conversation group id for the next message.
            WAITFOR (
                GET CONVERSATION GROUP @cgid FROM [JobQueue]
            ), TIMEOUT 1000

            IF (@@ROWCOUNT = 0) BEGIN

                PRINT 'pr_ProcessJob: ROLLBACK TRANSACTION (GET CONVERSATION)'
                ROLLBACK TRANSACTION
                BREAK
            END

            PRINT @CGID

            -- Inner Loop (Message Processing)
            WHILE (1=1) BEGIN

                -- Receive the next available message
                PRINT 'Receiving Message.'
                WAITFOR (
                    RECEIVE top(1) -- just handle one message at a time
                        @message_type_name=message_type_name,  --the type of message received
                        @message_body=CAST(message_body AS XML),      -- the message contents
                        @message_sequence_number=message_sequence_number,
                        @dialog = conversation_handle    -- the identifier of the dialog this message was received on
                        FROM [JobQueue]
                        WHERE conversation_group_id=@cgid
                ), TIMEOUT 3000  -- if the queue is empty for three seconds, give up and go away

                -- If we didn't get anything, the queue is empty so bail out

                IF (@@ROWCOUNT = 0) BEGIN               
                    PRINT 'pr_ProcessJob::WaitFor - No messages for conversation group bailing out'
                    BREAK
                END --IF (@@ROWCOUNT = 0)

                PRINT 'Message Received: ' + @message_type_name
                SAVE TRANSACTION MessageReceivedSavePoint

                -- Handle the End Conversation Message
                IF (@message_type_name = 'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog') BEGIN
                    -- When we receive an End Dialog, we need to end also.
                    PRINT 'ENDING CONVERSATION'
                    END CONVERSATION @dialog
                END -- IF (@message_type_name = 'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog') BEGIN
                ELSE BEGIN
                    -- Handle the Conversation Error Message
                    IF (@message_type_name = 'http://schemas.microsoft.com/SQL/ServiceBroker/Error') BEGIN
                        -- We can't return anything here because the dialog at the other end is closed so just log 
                        -- an error and close our end of the conversation.

                        PRINT 'ENDING CONVERSATION w/Error'
                        END CONVERSATION @dialog
                    END -- (@message_type_name = 'http://schemas.microsoft.com/SQL/ServiceBroker/Error')
                    ELSE BEGIN
                        IF (@message_type_name = 'SubmitJob')  BEGIN -- Process normal Job messages..   

                            PRINT 'pr_ProcessJob:: Message Type SubmitJob received.'

                            -- Pull the information out of the task message with XQuery
                            SELECT @JobID = @message_body.value('(/Job/JobID)[1]', 'int'),
                                @Params = @message_body.value('(/Job/Params)[1]', 'varchar(MAX)')

                            PRINT 'pr_ProcessJob::@JobID = ' + cast(@jobID as varchar(10))
                            PRINT 'pr_ProcessJob::@Params = ' + @Params


                            SELECT  @ErrorNumber = 0, @ErrorText = N''

                            -- Do something with the job
                            -- save state

                            -- we are looking for the first step 
                            SET @Step=1

                            PRINT 'Selecting from JobTask'
                            ---------------------------------------------------------
                            -- Get the next task
                            ---------------------------------------------------------
                            SELECT  TOP 1
                                @TaskID=task.TaskID, 
                                @TaskService=tt.TaskService, 
                                @TaskKey =Task.TaskKey
                            FROM JobTask task INNER JOIN TaskType tt
                                ON task.TaskTypeID = tt.TaskTypeID      
                            WHERE task.jobID=@JobID AND task.enabled=1 and task.step>=@step
                            ORDER BY Task.step
                            ---------------------------------------------------------
                            PRINT 'Selecting from JobTask: complete'

                            PRINT 'Step='+cast(@step as varchar(max))               
                            PRINT 'TaskID='+cast(@TaskID as varchar(max))
                            PRINT 'TaskService='+cast(@TaskService as varchar(max))
                            PRINT'TaskKey='+cast(@TaskKey as varchar(max))                      

                            PRINT 'BEGIN DIALOG with ' + @TaskService   

                            BEGIN DIALOG @chEngine
                                FROM SERVICE [JobService]
                                TO SERVICE @TaskService
                                ON CONTRACT [TaskContract]
                                WITH RELATED_CONVERSATION=@dialog;

                            PRINT 'BEGIN DIALOG with ' + @TaskService+' completed.'

                            SET @EngineMsg = CAST('<Task><TaskID>'+ str(@TaskID)+'</TaskID><TaskKey>'+ str(@Taskkey)+'</TaskKey></Task>' as XML);

                            PRINT CAST(@EngineMsg as varchar(max))

                            PRINT 'Sending Message Type SubmitTask to Engine.';

                            SEND ON CONVERSATION @chEngine
                                MESSAGE TYPE SubmitTask
                                (@EngineMsg)

                            PRINT 'Inserting into jobstate'

                            INSERT INTO JobState(cgid, jobch, jobID, step) VALUES(@cgid, @dialog, @jobid, @step)

                        END -- IF (@message_type_name = 'SubmitJob') 
                        ELSE BEGIN

                            IF (@message_type_name = 'TaskResponse')  BEGIN 

                                PRINT 'Processing MessageType TaskResponse'

                                SELECT @TaskStatus = @message_body.value('(/TaskStatus)[1]', 'int')

                                PRINT 'pr_ProcessJob::@TaskStatus = ' + cast(@TaskStatus as varchar(10))

                                PRINT 'Loading State'

                                --LoadState
                                SELECT  @JobID=jobid, 
                                    @Step=Step, 
                                    @jobch=jobch,
                                    @TimeStarted=sysdate    
                                FROM Jobstate
                                WHERE cgid=@cgid

                                PRINT 'Loading State complete' 

                                PRINT @jobch

                                PRINT 'Selecting from JobTask'
                                ---------------------------------------------------------
                                -- Get the next task
                                ---------------------------------------------------------
                                SELECT  TOP 1
                                    @TaskID=task.TaskID, 
                                    @TaskService=tt.TaskService, 
                                    @TaskKey =task.TaskKey,
                                    @NextStep = task.Step

                                FROM JobTask task INNER JOIN TaskType tt
                                    ON task.TaskTypeID = tt.TaskTypeID      
                                WHERE task.jobID=@JobID AND task.enabled=1 and task.step>@step
                                ORDER BY Task.step
                                ---------------------------------------------------------
                                PRINT 'Selecting from JobTask: complete'

                                PRINT 'NextTask: ['+@TaskService+']'

                                if (@TaskService is null) BEGIN

                                    PRINT '@TaskService is NULL: BEGIN'

                                    -- no more tasks
                                    --END CONVERSATION @jobch

                                    PRINT 'Removing from state table'                               
                                    DELETE FROM JobState 
                                    WHERE @cgid=cgid
                                    PRINT @@ROWCOUNT
                                    PRINT 'Removing from state table-completed'

                                    DECLARE @ResponseDoc xml

                                    -- Send a response message saying we're done
                                    DECLARE @Time nvarchar(100)                 
                                    SET @Time = cast(getdate() as nvarchar(100))

                                    DECLARE @TimeStartedText nvarchar(100)
                                    SET @TimeStartedText = cast(@TimeStarted as nvarchar(100))

                                    SET @ResponseDoc = N'<Job/>'
                                    SET @ResponseDoc.modify(
                                    'insert (<JobID>{ sql:variable("@JobID") }</JobID>, 
                                    <JobStatus>{ sql:variable("@ErrorNumber") }</JobStatus>,
                                    <ErrorNumber>{ sql:variable("@ErrorNumber") }</ErrorNumber>,
                                    <ErrorText>{ sql:variable("@ErrorText") }</ErrorText>,
                                    <TimeStarted>{ sql:variable("@TimeStartedText") }</TimeStarted>, 
                                    <TimeCompleted>{ sql:variable("@Time") }</TimeCompleted>) 
                                    as last into /Job [1]'); 

                                    SEND ON CONVERSATION @jobch 
                                        MESSAGE TYPE [JobResponse] (@ResponseDoc)

                                    END CONVERSATION @jobch             

                                    PRINT '@TaskService is NULL: END'

                                END --if (@TaskService is null) BEGIN
                                ELSE BEGIN
                                    -- there are more tasks 
                                    PRINT '@TaskService is not null: BEGIN'

                                    PRINT 'BEGIN DIALOG with ' + @TaskService

                                    --another task
                                    BEGIN DIALOG @chEngine
                                        FROM SERVICE [JobService]
                                        TO SERVICE @TaskService
                                        ON CONTRACT [TaskContract]
                                        WITH RELATED_CONVERSATION=@dialog;

                                    SET @EngineMsg = CAST('<Task><TaskID>'+ str(@TaskID)+'</TaskID><TaskKey>'+ str(@Taskkey)+'</TaskKey></Task>' as XML);

                                    PRINT 'SEND ' +cast(@EngineMsg as varchar(max));

                                    SEND ON CONVERSATION @chEngine
                                    MESSAGE TYPE SubmitTask (@EngineMsg)                                    

                                    PRINT 'SAVING State: ' +str(@step)

                                    -- save state
                                    Update JobState
                                        SET step = @NextStep
                                    FROM JobState
                                    WHERE cgid=@cgid

                                    PRINT '@TaskService is not null: END'

                                END -- ELSE (@TaskService is NOT NULL)

                                PRINT 'Processing MessageType TaskResponse...Complete'

                            END -- IF (@message_type_name = 'TaskCompleted')

                        END -- ELSE IF (@message_type_name <> 'JobRequest') 
                    END -- ELSE  (@message_type_name <> 'http://schemas.microsoft.com/SQL/ServiceBroker/Error')
                END -- ELSE (@message_type_name <> 'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog')
            END -- WHILE (1=1) BEGIN

            PRINT 'COMMIT TRANSACTION'
            COMMIT TRANSACTION

        END TRY
        BEGIN CATCH

            --rollback transaction

            DECLARE @ErrNum int
            DECLARE @ErrMsg varchar(max)


                SELECT
                    ERROR_NUMBER() AS ErrorNumber
                    ,ERROR_SEVERITY() AS ErrorSeverity
                    ,ERROR_STATE() AS ErrorState
                    ,ERROR_PROCEDURE() AS ErrorProcedure
                    ,ERROR_LINE() AS ErrorLine
                    ,ERROR_MESSAGE() AS ErrorMessage;

            PRINT 'pr_ProcessJob: ROLLBACK (CATCH)'


            if (error_number()=1205) BEGIN
                -- a deadlock occurred. We can try it again.
                PRINT 'pr_ProcessJob: ROLLBACK TRANSACTION (CATCH)'
                ROLLBACK TRANSACTION
                --CONTINUE
            END --if (error_number()=1205)
            ELSE BEGIN
                if (error_number()=9617) BEGIN
                    PRINT 'pr_ProcessJob: ROLLBACK TRANSACTION (CATCH)'
                    ROLLBACK TRANSACTION
                END
                ELSE BEGIN -- (error_number()<>9617) 
                    -- another error occurred.  The message cant be procesed sucessfully
                    PRINT 'pr_ProcessJob: ROLLBACK TRANSACTION to MessageReceivedSavePoint (CATCH)'
                    ROLLBACK TRANSACTION MessageReceivedSavePoint

                END --ELSE (error_number()<>9617)       
            END -- if (error_number()<>1205)
        END CATCH

    END -- while loop

PRINT 'pr_ProcessJob: Complete' 

END -- CREATE PROCEDURE [dbo].[ProcessJobProc]

1 Ответ

1 голос
/ 09 февраля 2010

Если последующие экземпляры вашего приложения могут видеть сообщение, это означает только одно: предыдущий экземпляр должен был откатить получение. На первый взгляд предоставленный вами код выглядит нормально, поэтому я пойду искать ошибки в используемой вами объектной модели. Если один из его методов вызывает исключение, приложение будет прервано, и транзакция, получившая сообщение, автоматически откатится.

Если вы хотите, чтобы одновременно работал только один экземпляр вашего приложения, оставьте параметр Max равным 1, иначе по умолчанию он будет запускать больше экземпляров одновременно, чтобы не отставать от нагрузки.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...