Как добавить сообщение в служебную шину Azure из функции Azure? - PullRequest
0 голосов
/ 09 декабря 2018

Как добавить сообщение в служебную шину, если у меня уже есть сообщение в формате json.Я могу добавить сообщение, используя привязку вывода функции Azure, но ни одно из свойств сообщения не отображается ни в servicebusexplorer, ни в queueexplorer.

Мне нужно повторно отправить около 1 КБ сообщений, произошла ошибка в сообщениях, поэтому я экспортировал их в файл, исправил в notepad ++ и теперь создал функцию Azure, которая читает файл и помещает его вочередь.Но когда я смотрю на сообщение, ни одно из свойств сообщения не отображается в servicebusexploerer.

run.csx

#r "Newtonsoft.Json"

using System.Net;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Primitives;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using System;
using System.Threading.Tasks;
using System.Configuration;

const string QueueName = "commands";
static string FileName = "messages.json";

public static async Task<string> Run(HttpRequest req, ILogger log,
                 ExecutionContext context, ICollector<string> outputSbQueue)
{
    log.LogInformation("Starting processing messages.");

    var filePath = System.IO.Path.Combine(context.FunctionDirectory, FileName);

    log.LogInformation("Path: " + filePath);

    var text = File.ReadAllText(filePath);

    log.LogInformation("Message: " + text);

    JArray messages = JArray.Parse(text);

    log.LogInformation("Number of message: " + messages.Count);

    await SendMessagesAsync(messages,log,outputSbQueue);
    // return req.CreateResponse(HttpStatusCode.OK,
    //                             "Updated",
    //                             "text/plain");
    return "test";
}

static async Task SendMessagesAsync(JArray messages, ILogger log, 
ICollector<string> outputSbQueue )
{
    log.LogInformation("About to iterate messages");

    foreach (var message in messages)
    {
        log.LogInformation("Sending Message");
        outputSbQueue.Add(message.ToString());
        log.LogInformation("Sent message: " + message);
    }
}

messages.json

[
  {
    "Body": {
      "PaymentPlanId": "2141110b-07da-46b7-a166-ffc7f9f6c5af",
      "InstallmentId": "3bd27b0d-3372-456c-856c-74e09de1413a",
      "Date": "2018-12-05T00:00:00",
      "Amount": 66.89,
      "Attempt": 0,
      "PaymentCorrelationId": "2ae7511e-706f-4d7f-b44b-9690d0fcbf38",
      "CommandId": "a2d5ae26-6289-4cca-bce0-7a1905b64378"
    },
    "ContentType": "text/plain",
    "CorrelationId": null,
    "DeadLetterSource": "commands",
    "DeliveryCount": 1,
    "EnqueuedSequenceNumber": 14684,
    "EnqueuedTimeUtc": "2018-12-06T13:22:37.131Z",
    "ExpiresAtUtc": "9999-12-31T23:59:59.9999999",
    "ForcePersistence": false,
    "IsBodyConsumed": false,
    "Label": "PayDueInstallmentCommand",
    "LockedUntilUtc": null,
    "LockToken": null,
    "MessageId": "a2d5ae26-6289-4cca-bce0-7a1905b64378",
    "PartitionKey": null,
    "Properties": {
      "BodyClrType": "SR.Domain.Commands.PayDueInstallmentCommand, SR.Domain, Version=1.0.0.0, Culture=neutral, PublicKeyToken=null",
      "ParentId": "|Dz4Pxv65XMA=.3975a8a2_32.",
      "RootId": "Dz4Pxv65XMA=",
      "Diagnostic-Id": "|Dz4Pxv65XMA=.3975a8a2_32.1.",
      "DeadLetterReason": "NoCommandInMessage",
      "DeadLetterErrorDescription": "There was no command in the message.",
      "Test":"1"
    },
    "ReplyTo": null,
    "ReplyToSessionId": null,
    "ScheduledEnqueueTimeUtc": "2018-12-06T13:22:36.877Z",
    "SequenceNumber": 14684,
    "SessionId": null,
    "Size": 938,
    "State": 0,
    "TimeToLive": "10675199.02:48:05.4775807",
    "To": null,
    "ViaPartitionKey": null
  }
 ]

function.json

{
  "bindings": [
    {
      "authLevel": "function",
      "name": "req",
      "type": "httpTrigger",
      "direction": "in",
      "methods": [
        "get",
        "post"
      ]
    },
    {
      "name": "$return",
      "type": "http",
      "direction": "out"
    },
    {
      "name": "outputSbQueue",
      "type": "serviceBus",
      "queueName": "deadletter",
      "connection": "ServiceBusConnectionString",
      "direction": "out"
    }
  ],
  "disabled": false
}

1 Ответ

0 голосов
/ 10 декабря 2018

Проблема сборки легко решается, как упоминал @Roman.Так как вы создали функцию v2 (по умолчанию, если вы не изменяли версию нового приложения Function во время выполнения), добавьте команду ниже.

#r "..\\bin\\Microsoft.Azure.ServiceBus.dll" 
using Microsoft.Azure.ServiceBus;

Другая проблема - это структура вашей модели JSON.На самом деле он основан на BrokeredMessage в Microsoft.ServiceBus.Messaging вместо Message в Microsoft.Azure.ServiceBus.Возможно, вам придется решить, какой из них использовать, и при необходимости провести рефакторинг Json.Обратите внимание, что некоторые свойства устанавливаются службой служебной шины Azure, которую мы не можем изменить в новых созданных сообщениях.

Сделайте пример Message.Рефакторинг JSON в соответствии с классом Message, включая все настраиваемые свойства.

[
  {
    "Body": {
      "PaymentPlanId": "2141110b-07da-46b7-a166-ffc7f9f6c5af",
      ...
    },
    "ContentType": "text/plain",
    "Label": "MyLable",
    "MessageId": "a2d5ae26-6289-4cca-bce0-7a1905b64378",
    "ScheduledEnqueueTimeUtc": "2018-12-06T13:22:36.877Z",
    "TimeToLive": "10675199.02:48:05.4775807",
    "CorrelationId": null,
    "PartitionKey": null,
    "ReplyTo": null,
    "ReplyToSessionId": null,
    "SessionId": null,
    "To": null,
    "ViaPartitionKey": null
    "UserProperties": {
        "CustomProperty":"test",
        ...
    }
  }
 ]

Мы не можем напрямую использовать десериализацию, например JsonConvert.DeserializeObject, поскольку тело сообщения требует byte[].

        foreach (var message in messages)
        {
            // Get Body first
            var body = System.Text.Encoding.UTF8.GetBytes(message["Body"].ToString());
            // Empty Body section to deserialize properties
            message["Body"] = "";
            var SBMessage = JsonConvert.DeserializeObject<Message>(message.ToString());
            SBMessage.Body = body;
            log.LogInformation("Sending Message");
            outputSbQueue.Add(SBMessage);
            log.LogInformation("Sent message: " + SBMessage.MessageId);
        }
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...