Отправка сообщений в масштабе на служебную шину из надежных функций - PullRequest
0 голосов
/ 17 июня 2019

У меня есть сценарий, в котором одна функция действия извлекает набор записей, который может быть от 1000 до миллиона, и сохраняется в объекте. Этот объект затем используется функцией следующего действия для параллельной отправки сообщений на служебную шину.

В настоящее время я использую цикл for для этого объекта для отправки каждой записи в объекте на служебную шину. Пожалуйста, дайте мне знать, если есть лучший альтернативный шаблон, где объект или контент (где бы он ни хранился) очищался для отправки на служебную шину, и функция автоматически масштабировалась, не ограничивая обработку циклом for.

  • Использовали цикл for из функции, которая оркестрирует для вызова функций действия для записей в объекте.
  • Посмотрели масштабирование функции активности, и для набора из 18000 записей она масштабировалась до 15 экземпляров и обработала весь набор за 4 минуты.
  • В настоящее время функция использует план потребления. Проверено, чтобы убедиться, что только это приложение-функция использует этот план, и его нельзя использовать совместно.
  • В теме, в которую отправляется сообщение, есть другая служба, которая прослушивает сообщение.
  • Число экземпляров как для функции оркестровки, так и для функции активности доступно по умолчанию.
    for(int i=0;i<number_messages;i++)
    {
       taskList[i] = 
    context.CallActivityAsync<string>("Sendtoservicebus", 
       (messages[i],runId,CorrelationId,Code));
     }

    try
     {
      await Task.WhenAll(taskList);
     }
    catch (AggregateException ae)
     {
      ae.Flatten();
     }

Сообщения должны быть быстро отправлены на служебную шину путем соответствующего масштабирования функций активности.

1 Ответ

1 голос
/ 18 июня 2019

Я бы посоветовал вам использовать Пакет для отправки сообщений.

Клиент Azure Service Bus поддерживает отправку сообщений в пакетном режиме (методы SendBatch и SendBatchAsync QueueClient и TopicClient).Однако размер одной партии должен оставаться ниже 256 Кбайт, иначе вся партия будет отклонена.

Мы начнем с простого варианта использования: размер каждого сообщения нам известен.Это определяется гипотетической функцией getCize Func.Вот полезный метод расширения, который разбивает произвольную коллекцию на основе метрической функции и максимального размера чанка:

public static List<List<T>> ChunkBy<T>(this IEnumerable<T> source, Func<T, long> metric, long maxChunkSize)
{
    return source
        .Aggregate(
            new
            {
                Sum = 0L,
                Current = (List<T>)null,
                Result = new List<List<T>>()
            },
            (agg, item) =>
            {
                var value = metric(item);
                if (agg.Current == null || agg.Sum + value > maxChunkSize)
                {
                    var current = new List<T> { item };
                    agg.Result.Add(current);
                    return new { Sum = value, Current = current, agg.Result };
                }

                agg.Current.Add(item);
                return new { Sum = agg.Sum + value, agg.Current, agg.Result };
            })
        .Result;
}

Теперь реализация SendBigBatchAsync проста:

public async Task SendBigBatchAsync(IEnumerable<T> messages, Func<T, long> getSize)
{
    var chunks = messages.ChunkBy(getSize, MaxServiceBusMessage);
    foreach (var chunk in chunks)
    {
        var brokeredMessages = chunk.Select(m => new BrokeredMessage(m));
        await client.SendBatchAsync(brokeredMessages);
    }
}

private const long MaxServiceBusMessage = 256000;
private readonly QueueClient client;

howмы определяем размер каждого сообщения?Как реализовать функцию getSize?

Класс BrokeredMessage предоставляет свойство Size, поэтому может возникнуть соблазн переписать наш метод следующим образом:

public async Task SendBigBatchAsync<T>(IEnumerable<T> messages)
{
    var brokeredMessages = messages.Select(m => new BrokeredMessage(m));
    var chunks = brokeredMessages.ChunkBy(bm => bm.Size, MaxServiceBusMessage);
    foreach (var chunk in chunks)
    {
        await client.SendBatchAsync(chunk);
    }
}

Последняя возможность, которую я хочу рассмотреть,на самом деле вы можете нарушить максимальный размер пакета, но затем обработать исключение, повторить операцию отправки и скорректировать будущие вычисления на основе фактического измеренного размера сообщений с ошибками.Размер известен после попытки SendBatch, даже если операция не удалась, поэтому мы можем использовать эту информацию.

// Sender is reused across requests
public class BatchSender
{
    private readonly QueueClient queueClient;
    private long batchSizeLimit = 262000;
    private long headerSizeEstimate = 54; // start with the smallest header possible

    public BatchSender(QueueClient queueClient)
    {
        this.queueClient = queueClient;
    }

    public async Task SendBigBatchAsync<T>(IEnumerable<T> messages)
    {
        var packets = (from m in messages
                     let bm = new BrokeredMessage(m)
                     select new { Source = m, Brokered = bm, BodySize = bm.Size }).ToList();
        var chunks = packets.ChunkBy(p => this.headerSizeEstimate + p.Brokered.Size, this.batchSizeLimit);
        foreach (var chunk in chunks)
        {
            try
            {
                await this.queueClient.SendBatchAsync(chunk.Select(p => p.Brokered));
            }
            catch (MessageSizeExceededException)
            {
                var maxHeader = packets.Max(p => p.Brokered.Size - p.BodySize);
                if (maxHeader > this.headerSizeEstimate)
                {
                    // If failed messages had bigger headers, remember this header size 
                    // as max observed and use it in future calculations
                    this.headerSizeEstimate = maxHeader;
                }
                else
                {
                    // Reduce max batch size to 95% of current value
                    this.batchSizeLimit = (long)(this.batchSizeLimit * .95);
                }

                // Re-send the failed chunk
                await this.SendBigBatchAsync(packets.Select(p => p.Source));
            }

        }
    }
}

Вы можете использовать этот блог для дальнейшей ссылки .Надеюсь, это поможет.

...