Я пытаюсь вызвать метод RegisterMessageHandler для экземпляра SubscriptionClient, запущенного в Android.
Примерно через 20 секунд после публикации сообщения в указанный c topi c (через автоматический тест), я получаю несколько исключений тайм-аута в моем приложении Android, которое фактически содержит RegisterMessageHandler для экземпляра SubscriptionClient.
Окно вывода
Вот ошибка:
** System.TimeoutException: ** 'timeout'
Регистрация
Приведенный ниже код создает экземпляр SubsciptionClient и затем регистрируется обработчик сообщений:
member x.Enable () =
async {
if not (isEnabled) then
subscriptionClient <- new SubscriptionClient(connectionString, "Topic.courier-requested", "Subscription.all-messages")
subscriptionClient.OperationTimeout <- TimeSpan.FromMinutes(1.0)
let! rulesFound = subscriptionClient.GetRulesAsync() |> Async.AwaitTask
let hasDefaultRule = rulesFound.Any(fun r -> r.Name = RuleDescription.DefaultRuleName)
if hasDefaultRule then
do! subscriptionClient.RemoveRuleAsync(RuleDescription.DefaultRuleName) |> Async.AwaitTask
else
let msgOptions = MessageHandlerOptions(fun args -> exceptionReceivedHandler(args))
msgOptions.AutoComplete <- false
msgOptions.MaxAutoRenewDuration <- TimeSpan.FromMinutes(1.0)
msgOptions.MaxConcurrentCalls <- 1
subscriptionClient.RegisterMessageHandler(processMessageAsync, msgOptions)
} |> Async.StartAsTask
Обработчик сообщений
Код обработчика сообщений никогда не выполняется. Вместо этого я наблюдаю несколько предупреждений об исключении тайм-аута, а также сообщения о тайм-ауте в моем окне вывода.
Вот код, который я зарегистрировал с помощью RegisterMessageHandler:
let processMessageAsync (message:Message) (_:CancellationToken) =
let json = Encoding.UTF8.GetString(message.Body)
...
subscriptionClient.CompleteAsync(message.SystemProperties.LockToken) |> Async.AwaitTask |> Async.RunSynchronously
Task.CompletedTask
Android Test - Fails
Следующий тест публикует сообщение, которое вызывает исключение тайм-аута в моем приложении Android примерно через 20 секунд:
[<Fact>]
let ``Publish message courier-requested message to servicebus topic``() =
// Setup
let connectionstring = ConfigurationManager.ConnectionStrings.["servicebus_testEnv"].ConnectionString
// Test
async {
let client = TopicClient(connectionstring, "Topic.courier-requested")
let data = "test_data"
let message = Message(Encoding.UTF8.GetBytes(data))
let courierId = "b965f552-31a4-4644-a9c6-d86dd45314c4"
message.Label <- sprintf "courier-id(%s)" courierId
do! client.SendAsync(message) |> Async.AwaitTask
do! client.CloseAsync() |> Async.AwaitTask
}
Console Test - Passes
Я могу успешно запустить тот же код подписки в консольном приложении (без таймаутов):
[<EntryPoint>]
let main argv =
printfn "Welcome to Subscription.Console"
async {
let subscriber = Subscriber(connectionString)
do! subscriber.Listen()
} |> Async.RunSynchronously
Console.ReadKey() |> ignore
0 // return an integer exit code
Вот реализация:
type Subscriber(connectionString:string) =
let mutable subscriptionClient : SubscriptionClient = null
let exceptionReceivedHandler (args:ExceptionReceivedEventArgs) =
printfn "Got an exception: %A" args.Exception
Task.CompletedTask
let processMessageAsync (message:Message) (_:CancellationToken) =
let json = Encoding.UTF8.GetString(message.Body)
subscriptionClient.CompleteAsync(message.SystemProperties.LockToken) |> Async.AwaitTask |> Async.RunSynchronously
Task.CompletedTask
member x.Listen() =
async {
subscriptionClient <- new SubscriptionClient(connectionString, "Topic.courier-requested", "Subscription.all-messages")
subscriptionClient.OperationTimeout <- TimeSpan.FromMinutes(1.0)
let! rulesFound = subscriptionClient.GetRulesAsync() |> Async.AwaitTask
let hasDefaultRule = rulesFound.Any(fun r -> r.Name = RuleDescription.DefaultRuleName)
if hasDefaultRule then
do! subscriptionClient.RemoveRuleAsync(RuleDescription.DefaultRuleName) |> Async.AwaitTask
else
let msgOptions = MessageHandlerOptions(fun args -> exceptionReceivedHandler(args))
msgOptions.AutoComplete <- false
msgOptions.MaxAutoRenewDuration <- TimeSpan.FromMinutes(1.0)
msgOptions.MaxConcurrentCalls <- 1
subscriptionClient.RegisterMessageHandler(processMessageAsync, msgOptions)
}
Приложение
Я применил к строке подключения следующее:
TransportType=AmqpWebSockets;
Я ссылался на следующие ссылки:
https://github.com/Azure/azure-service-bus-dotnet/issues/529
Azure Блокировка сообщения служебной шины не обновляется?