SQL Queue перестает работать и заполняется сообщениями - PullRequest
3 голосов
/ 13 января 2012

В настоящее время я борюсь с брокером SQL.Кажется, все настроено хорошо, но очередь перестает работать и заполняется неотправленными сообщениями.Посредник и очередь включены.Если я отбрасываю очередь и службу и воссоздаю их, это работает некоторое время, но останавливается снова позже.Я не вижу важной ошибки в журнале сервера sql.Так что может вызвать ошибку?

Спасибо

ОЧЕРЕДЬ

CREATE QUEUE [dbo].[DataChangeQueue] WITH STATUS = ON , RETENTION = OFF , ACTIVATION (  STATUS = ON , PROCEDURE_NAME = [dbo].[DataChangeQueueProc] , MAX_QUEUE_READERS = 100 , EXECUTE AS N'dbo'), POISON_MESSAGE_HANDLING (STATUS = ON)  ON [PRIMARY]

СЕРВИС

CREATE SERVICE [DataChangeService]  AUTHORIZATION [dbo]  ON QUEUE [dbo].[DataChangeQueue] ([http://schemas.microsoft.com/SQL/Notifications/PostQueryNotification])

C #

    public DatabaseNotificationService()
    {
        SqlDependency.Start(m_SQLConnectionString, "DataChangeQueue");

        if (IsAccessGranted())
        {
            ConnectToDatabase();
        }
    }

    ~DatabaseNotificationService()
    {
        SqlDependency.Stop(m_SQLConnectionString, "DataChangeQueue");
    }


    private void ConnectToDatabase()
    {
        using (SqlConnection sqlConnection = new SqlConnection(m_SQLConnectionString))
        {
            sqlConnection.Open();

            using (SqlCommand sqlCommand = sqlConnection.CreateCommand())
            {
                sqlCommand.CommandType = CommandType.Text;
                sqlCommand.CommandText = GetSQLCommandText();
                sqlCommand.Notification = null;

                if (m_SQLDependency != null)
                {
                    m_SQLDependency.OnChange -= DependencyOnChange;
                    m_SQLDependency = null;
                }

                m_SQLDependency = new SqlDependency(sqlCommand, "Service=DataChangeService;Local Database=aspnetdb", 1800);
                m_SQLDependency.OnChange += DependencyOnChange;

                sqlCommand.ExecuteReader();
            }

            sqlConnection.Close();
        }
    }


    private void DependencyOnChange(object sender, SqlNotificationEventArgs e)
    {
        using (SqlConnection sqlConnection = new SqlConnection(m_SQLConnectionString))
        {
            sqlConnection.Open();

            using (SqlCommand cmd2 = sqlConnection.CreateCommand())
            {
                cmd2.CommandType = CommandType.Text;
                cmd2.CommandText = GetOnChangeSQLCommandText();

                using (SqlDataReader sqlDataReader = cmd2.ExecuteReader())
                {
                    if (sqlDataReader != null)
                    {
                        sqlDataReader.Read();

                        List<String> keys = new List<String>(m_Clients.Keys);
                        foreach (String key in keys)
                        {
                            IDatabaseNotificationCallbackContract client;
                            if (m_Clients.TryGetValue(key, out client))
                            {
                                if (((ICommunicationObject)client).State == CommunicationState.Opened)
                                {
                                    client.SendNotificationToClients(sqlDataReader.GetValue(0).ToString());
                                }
                                else
                                {
                                    m_Clients.Remove(key);
                                }
                            }                    
                        }
                    }
                }
            }

            sqlConnection.Close();
        }

        if (m_SQLDependency != null)
        {
            m_SQLDependency.OnChange -= DependencyOnChange;
            m_SQLDependency = null;
        }

        //Reconnect to database for listening to following changes.
        ConnectToDatabase();
    }

1 Ответ

0 голосов
/ 25 января 2013

Когда очередь «останавливается», она все еще включена?

Если нет, вы можете иметь дело с ядовитым сообщением в вашей очереди.Они могут прекратить обработку

Я рекомендую проверить первое сообщение и посмотреть, является ли оно действительным для вашей обработки.

Если это не так, тогда вытащите его.код, который я использовал бы для удаления первого разговора (и, следовательно, ядовитого сообщения) из очереди сообщений:

BEGIN TRANSACTION;

DECLARE @handle nvarchar(100);
RECEIVE TOP(1) @handle = [conversation_handle] FROM yourQueueName;

END CONVERSATION (@handle)
    WITH ERROR = 127 DESCRIPTION = N'Unable to process message.' ;
GO

COMMIT TRANSACTION;
...