Исключения тайм-аута при вызове RegisterMessageHandler для SubscriptionClient - PullRequest
0 голосов
/ 05 февраля 2020

Я пытаюсь вызвать метод RegisterMessageHandler для экземпляра SubscriptionClient, запущенного в Android.

Примерно через 20 секунд после публикации сообщения в указанный c topi c (через автоматический тест), я получаю несколько исключений тайм-аута в моем приложении Android, которое фактически содержит RegisterMessageHandler для экземпляра SubscriptionClient.

Окно вывода

Вот ошибка:

** System.TimeoutException: ** 'timeout'

Регистрация

Приведенный ниже код создает экземпляр SubsciptionClient и затем регистрируется обработчик сообщений:

member x.Enable () =

    async {

        if not (isEnabled) then

            subscriptionClient <- new SubscriptionClient(connectionString, "Topic.courier-requested", "Subscription.all-messages")
            subscriptionClient.OperationTimeout <- TimeSpan.FromMinutes(1.0)

            let! rulesFound     = subscriptionClient.GetRulesAsync() |> Async.AwaitTask
            let  hasDefaultRule = rulesFound.Any(fun r -> r.Name = RuleDescription.DefaultRuleName)

            if hasDefaultRule then
                do! subscriptionClient.RemoveRuleAsync(RuleDescription.DefaultRuleName) |> Async.AwaitTask

            else
                let msgOptions = MessageHandlerOptions(fun args -> exceptionReceivedHandler(args))
                msgOptions.AutoComplete         <- false
                msgOptions.MaxAutoRenewDuration <- TimeSpan.FromMinutes(1.0)
                msgOptions.MaxConcurrentCalls   <- 1

                subscriptionClient.RegisterMessageHandler(processMessageAsync, msgOptions)

    } |> Async.StartAsTask

Обработчик сообщений

Код обработчика сообщений никогда не выполняется. Вместо этого я наблюдаю несколько предупреждений об исключении тайм-аута, а также сообщения о тайм-ауте в моем окне вывода.

Вот код, который я зарегистрировал с помощью RegisterMessageHandler:

let processMessageAsync (message:Message) (_:CancellationToken) = 

    let json    = Encoding.UTF8.GetString(message.Body)
    ...

    subscriptionClient.CompleteAsync(message.SystemProperties.LockToken) |> Async.AwaitTask |> Async.RunSynchronously
    Task.CompletedTask

Android Test - Fails

Следующий тест публикует сообщение, которое вызывает исключение тайм-аута в моем приложении Android примерно через 20 секунд:

[<Fact>]
let ``Publish message courier-requested message to servicebus topic``() =

    // Setup
    let connectionstring = ConfigurationManager.ConnectionStrings.["servicebus_testEnv"].ConnectionString

    // Test
    async {

        let client    = TopicClient(connectionstring, "Topic.courier-requested")
        let data      = "test_data"
        let message   = Message(Encoding.UTF8.GetBytes(data))
        let courierId = "b965f552-31a4-4644-a9c6-d86dd45314c4"
        message.Label <- sprintf "courier-id(%s)" courierId

        do! client.SendAsync(message) |> Async.AwaitTask
        do! client.CloseAsync()       |> Async.AwaitTask
    }

Console Test - Passes

Я могу успешно запустить тот же код подписки в консольном приложении (без таймаутов):

[<EntryPoint>]
let main argv =

    printfn "Welcome to Subscription.Console"

    async {

        let subscriber = Subscriber(connectionString)
        do! subscriber.Listen()

    } |> Async.RunSynchronously


    Console.ReadKey() |> ignore
    0 // return an integer exit code

Вот реализация:

type Subscriber(connectionString:string) =

    let mutable subscriptionClient : SubscriptionClient = null

    let exceptionReceivedHandler (args:ExceptionReceivedEventArgs) =
        printfn "Got an exception: %A" args.Exception
        Task.CompletedTask

    let processMessageAsync (message:Message) (_:CancellationToken) = 

        let json = Encoding.UTF8.GetString(message.Body)

        subscriptionClient.CompleteAsync(message.SystemProperties.LockToken) |> Async.AwaitTask |> Async.RunSynchronously
        Task.CompletedTask

    member x.Listen() =

        async {

            subscriptionClient <- new SubscriptionClient(connectionString, "Topic.courier-requested", "Subscription.all-messages")
            subscriptionClient.OperationTimeout <- TimeSpan.FromMinutes(1.0)

            let! rulesFound     = subscriptionClient.GetRulesAsync() |> Async.AwaitTask
            let  hasDefaultRule = rulesFound.Any(fun r -> r.Name = RuleDescription.DefaultRuleName)

            if hasDefaultRule then
                do! subscriptionClient.RemoveRuleAsync(RuleDescription.DefaultRuleName) |> Async.AwaitTask

            else

                let msgOptions = MessageHandlerOptions(fun args -> exceptionReceivedHandler(args))
                msgOptions.AutoComplete         <- false
                msgOptions.MaxAutoRenewDuration <- TimeSpan.FromMinutes(1.0)
                msgOptions.MaxConcurrentCalls   <- 1

                subscriptionClient.RegisterMessageHandler(processMessageAsync, msgOptions)
        }

Приложение

Я применил к строке подключения следующее:

TransportType=AmqpWebSockets;

Я ссылался на следующие ссылки:

https://github.com/Azure/azure-service-bus-dotnet/issues/529

Azure Блокировка сообщения служебной шины не обновляется?

1 Ответ

1 голос
/ 06 февраля 2020

Эмулятор Android не был подключен к inte rnet.

...