Обработка сообщений сервисной шины в долговременных функциях - PullRequest
0 голосов
/ 17 апреля 2020

У меня есть длительная функция azure, которая содержит ряд функций активности. Эта функция вызывается сообщением на служебной шине topi c. Я хочу иметь возможность вручную заполнить / отложить / отменить сообщение служебной шины на основе результатов этих функций деятельности. Однако я не нахожу способ получить доступ к MessageReceiver из ServiceBusTrigger в функции триггера.

Я попытался передать MessageReceiver в качестве входных данных в функцию оркестровки, но это вызывает ошибки при повторном получении значение через context.GetInput<MessageReceiver>(), потому что MessageReceiver нельзя сериализовать в JSON.

Function 'ImportTransactionOrchestrator (Orchestrator)' failed with an error. Reason: Newtonsoft.Json.JsonSerializationException: Unable to find a constructor to use for type Microsoft.Azure.ServiceBus.Core.MessageReceiver. A class should either have a default constructor, one constructor with arguments or a constructor marked with the JsonConstructor attribute.

Вот краткое изложение того, что у меня есть:

Функция триггера

[FunctionName(nameof(ServiceBusTrigger))]
public async Task ServiceBusTrigger(
    [ServiceBusTrigger(topicName: "topic", subscriptionName: "sub",
        Connection = "connection")]
    Message serviceBusMessage,
    MessageReceiver messageReceiver,
    [DurableClient] IDurableOrchestrationClient starter,
    ILogger log)
{
    var instanceId = await starter
        .StartNewAsync("ImportTransactionOrchestrator", null, (messageReceiver, serviceBusMessage));
}

Функция оркестровки

 [FunctionName(nameof(ImportTransactionOrchestrator))]
 public async Task RunOrchestrator([OrchestrationTrigger] IDurableOrchestrationContext context)
 {
     var (messageReceiver, serviceBusMessage ) = 
          context.GetInput<(MessageReceiver, Message)>(); // error occurs here
     {...}
 }

Мой поиск не сильно увеличился, поэтому я подозреваю, что я Я пытаюсь сделать что-то странное, может быть. Любая помощь будет оценена!

1 Ответ

0 голосов
/ 18 апреля 2020

Ваш Orchestrator должен нести ответственность за заполнение или нет сообщений. Я считаю, что messageReceiver не может быть сериализован и передан в качестве параметра вашим дочерним действиям.

Вам необходимо изменить для свойства autocomplete значение false на вашем хосте. json:

{
    "version": "2.0",
    "extensions": {
        "serviceBus": {
            "prefetchCount": 100,
            "messageHandlerOptions": {
                "autoComplete": false,
                "maxConcurrentCalls": 32,
                "maxAutoRenewDuration": "00:55:00"
            },
            "sessionHandlerOptions": {
                "autoComplete": false,
                "messageWaitTimeout": "00:00:30",
                "maxAutoRenewDuration": "00:55:00",
                "maxConcurrentSessions": 16
            }
        }
    }
}

и завершить сообщение:

await messageReceiver.CompleteAsync(message.SystemProperties.LockToken);
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...