Служба сервисной шины Azure.Получать сообщения в пакетном режиме - PullRequest
0 голосов
/ 19 февраля 2019

Я хотел бы получать сообщения от темы Azure ServiceBus в пакетном режиме.

Чтение https://docs.microsoft.com/en-us/azure/azure-functions/functions-best-practices в нем говорится:

Для функций C # вы можете изменитьвведите в строго типизированный массив.Например, вместо EventData sensorEvent подпись метода может быть EventData [] sensorEvent.

У меня есть метод:

public static void Run([ServiceBusTrigger("mytopic name", "MySubscription",
AccessRights.Listen, Connection = TopicService.ConnectionStringName)]
string messages, TraceWriter logger)

Этот метод работает, но он занимает 1 сообщение за один раз.

Согласно документации Microsoft, япросто можно изменить это на:

public static void Run([ServiceBusTrigger("mytopic name", "MySubscription",
AccessRights.Listen, Connection = TopicService.ConnectionStringName)]
string[] messages, TraceWriter logger)

И добавить следующее в файл host.json (https://docs.microsoft.com/en-us/azure/azure-functions/functions-host-json):

{
    "aggregator": {
        "batchSize": 10,
        "flushTimeout": "00:00:30"
    }
}

Но, запустив функцию, я получаю исключение:

mscorlib: Исключительная ситуация при выполнении функции: MyFunction. Microsoft.Azure.WebJobs.Host: Параметр привязки исключительной ситуации 'messages'. System.Runtime.Serialization: Произошла ошибка при десериализации объекта типа System.String []. Источник входного сигнала отформатирован неправильно. System.Runtime.Serialization: источник входного сигнала отформатирован неправильно.

Примечание. В теме и подписке включен параметр «Включить пакетные операции».

Что мне здесь не хватает?

1 Ответ

0 голосов
/ 19 февраля 2019

Вот код, который я пробовал.Проверьте и посмотрите, работает ли он.

//---------------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.  

using Microsoft.Azure;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace ServiceBusTriggers
{
    class Settings
    {
        public const string TopicPath = "sbperftopicwithpartitions";
        public const string Subscription = "sub_1";
        public const string ContainerName = "sbperf-test-store2";

        internal Settings()
        {
            ServiceBusConnectionString = GetSetting("ServiceBusConnectionString", UserSettings.ServiceBusConnectionString);
            StorageAccountConnectionString = GetSetting("StorageAccountConnectionString", UserSettings.StorageAccountConnectionString);
            AzureWebJobsDashboardConnectionString = GetSetting("AzureWebJobsDashboardConnectionString", UserSettings.AzureWebJobsDashboardConnectionString);
            AzureWebJobsStorageConnectionString = GetSetting("AzureWebJobsStorageConnectionString", UserSettings.AzureWebJobsStorageConnectionString);

            NLogDatabaseConnectionString = GetSetting("NLogDatabaseConnectionString", UserSettings.NLogDatabaseConnectionString);

            PrefetchCount = GetSetting("PrefetchCount", 100);
            MaxConcurrentCalls = GetSetting("MaxConcurrentCalls",100);

            MetricsDisplayFrequency = new TimeSpan(0, 0, 30); //every 30 seconds
            TokenSource = new CancellationTokenSource();
        }

        private int GetSetting(string name, int defaultValue)
        {
            int value;
            string valueStr = CloudConfigurationManager.GetSetting(name);
            if (!int.TryParse(valueStr, out value))
            {
                Console.WriteLine("Config missing for {0}. Using default.",name);
                value = defaultValue;
            }
            return value;
        }

        private string GetSetting(string name, string defaultValue)
        {
            string valueStr = CloudConfigurationManager.GetSetting(name);
            if (string.IsNullOrEmpty(valueStr))
            {
                Console.WriteLine("Config missing for {0}. Using default.", name);
                valueStr = defaultValue;
            }
            return valueStr;
        }

        public string ServiceBusConnectionString { get; set; }

        public string StorageAccountConnectionString { get; set;  }
       
        public int PrefetchCount { get; set; }

        public int MaxConcurrentCalls { get; set; }

        public TimeSpan MetricsDisplayFrequency { get; internal set; }

        public CancellationTokenSource TokenSource { get; set; }

        public string NLogDatabaseConnectionString { get; private set; }

        public static string AzureWebJobsDashboardConnectionString { get; internal set; }

        public static string AzureWebJobsStorageConnectionString { get; internal set; }

        public void WriteSettings()
        {
            ProjectLogger.Info("1|None|{1}|DisplayFrequency|{0}|", MetricsDisplayFrequency, Thread.CurrentThread.ManagedThreadId);
            ProjectLogger.Info("1|None|{1}|PrefetchCount|{0}|", PrefetchCount, Thread.CurrentThread.ManagedThreadId);
            ProjectLogger.Info("1|None|{1}|MaxConcurrentCalls|{0}|", MaxConcurrentCalls, Thread.CurrentThread.ManagedThreadId);
        }

    }
}

вот мой файл программы

//---------------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.  
//
// THIS CODE AND INFORMATION ARE PROVIDED "AS IS" WITHOUT WARRANTY OF ANY KIND, 
// EITHER EXPRESSED OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE IMPLIED WARRANTIES 
// OF MERCHANTABILITY AND/OR FITNESS FOR A PARTICULAR PURPOSE. 
//---------------------------------------------------------------------------------

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.ServiceBus;
using Microsoft.ServiceBus.Messaging;
using Microsoft.Azure;

namespace ServiceBusTriggers
{
    // To learn more about Microsoft Azure WebJobs SDK, please see https://go.microsoft.com/fwlink/?LinkID=320976
    class Program
    {
        // Please set the following connection strings in app.config for this WebJob to run:
        // AzureWebJobsDashboard and AzureWebJobsStorage
        static void Main()
        {
            var config = new JobHostConfiguration();

            if (config.IsDevelopment)
            {
                config.UseDevelopmentSettings();
            }
            Settings settings = new Settings();
            config.DashboardConnectionString = Settings.AzureWebJobsDashboardConnectionString;
            config.StorageConnectionString = Settings.AzureWebJobsStorageConnectionString;

            ServiceBusConfiguration sbconfig = new ServiceBusConfiguration()
            {
                ConnectionString = settings.ServiceBusConnectionString,
                PrefetchCount = settings.PrefetchCount,
                MessageOptions = new OnMessageOptions
                {
                    MaxConcurrentCalls = settings.MaxConcurrentCalls
                }
            };

            config.UseServiceBus(sbconfig);

            Functions.Initialize(settings);
            var host = new JobHost(config);
            // The following code ensures that the WebJob will be running continuously
            host.RunAndBlock();
        }
    }
}

и вот мой файл функций

 public class Functions
    {
        static CloudStorageAccount storageAccount = null;
        static Metrics metrics = null;
        static Settings Settings = null;

        internal static void Initialize(Settings settings)
        {
            Settings = settings;
            Initialize();
        }

        static void Initialize()
        {
            ProjectLogger.Initialize(Settings);
            storageAccount = CloudStorageAccount.Parse(Settings.StorageAccountConnectionString);
            WriteMessageCount();
            metrics = new Metrics(Settings);
            metrics.StartMetricsTask(Settings.TokenSource.Token).Fork();
        }

        public static async Task ProcessTopicAsync(
                [ServiceBusTrigger(Settings.TopicPath, Settings.Subscription )]
                    BrokeredMessage message)
        {
            Stopwatch sw = Stopwatch.StartNew();
            await WriteToBlob(message);
            sw.Stop();
            metrics.IncreaseProcessMessages(1);
            metrics.IncreaseProcessBatch(1);
            metrics.IncreaseProcessLatency(sw.Elapsed.TotalMilliseconds);
        }

        private static async Task WriteToBlob(BrokeredMessage message)
        {
            var data = message.GetBody<byte[]>();
            var blobClient = storageAccount.CreateCloudBlobClient();
            CloudBlobContainer container = blobClient.GetContainerReference(Settings.ContainerName);
            await container.CreateIfNotExistsAsync();
            var blob = container.GetBlockBlobReference(Guid.NewGuid().ToString());
            await blob.UploadFromByteArrayAsync(data, 0, data.Length);
        }

        static void WriteMessageCount()
        {
            var namespaceManager = NamespaceManager.CreateFromConnectionString(Settings.ServiceBusConnectionString);
            var subscriptionDesc = namespaceManager.GetSubscription(Settings.TopicPath, Settings.Subscription);
            long subMessageCount = subscriptionDesc.MessageCount;
            ProjectLogger.Info("1|None|{1}|MessagesinSub|{0}|", subMessageCount, Thread.CurrentThread.ManagedThreadId);
        }
    }

Для получения дополнительной информации, вы можете просмотреть ниже github репо

https://github.com/tcsatheesh/samples/blob/master/ServiceBusTrigger/Functions.cs

Надеюсь, что это поможет.

...