Я бы посоветовал вам использовать Пакет для отправки сообщений.
Клиент Azure Service Bus поддерживает отправку сообщений в пакетном режиме (методы SendBatch и SendBatchAsync QueueClient и TopicClient).Однако размер одной партии должен оставаться ниже 256 Кбайт, иначе вся партия будет отклонена.
Мы начнем с простого варианта использования: размер каждого сообщения нам известен.Это определяется гипотетической функцией getCize Func.Вот полезный метод расширения, который разбивает произвольную коллекцию на основе метрической функции и максимального размера чанка:
public static List<List<T>> ChunkBy<T>(this IEnumerable<T> source, Func<T, long> metric, long maxChunkSize)
{
return source
.Aggregate(
new
{
Sum = 0L,
Current = (List<T>)null,
Result = new List<List<T>>()
},
(agg, item) =>
{
var value = metric(item);
if (agg.Current == null || agg.Sum + value > maxChunkSize)
{
var current = new List<T> { item };
agg.Result.Add(current);
return new { Sum = value, Current = current, agg.Result };
}
agg.Current.Add(item);
return new { Sum = agg.Sum + value, agg.Current, agg.Result };
})
.Result;
}
Теперь реализация SendBigBatchAsync проста:
public async Task SendBigBatchAsync(IEnumerable<T> messages, Func<T, long> getSize)
{
var chunks = messages.ChunkBy(getSize, MaxServiceBusMessage);
foreach (var chunk in chunks)
{
var brokeredMessages = chunk.Select(m => new BrokeredMessage(m));
await client.SendBatchAsync(brokeredMessages);
}
}
private const long MaxServiceBusMessage = 256000;
private readonly QueueClient client;
howмы определяем размер каждого сообщения?Как реализовать функцию getSize?
Класс BrokeredMessage предоставляет свойство Size, поэтому может возникнуть соблазн переписать наш метод следующим образом:
public async Task SendBigBatchAsync<T>(IEnumerable<T> messages)
{
var brokeredMessages = messages.Select(m => new BrokeredMessage(m));
var chunks = brokeredMessages.ChunkBy(bm => bm.Size, MaxServiceBusMessage);
foreach (var chunk in chunks)
{
await client.SendBatchAsync(chunk);
}
}
Последняя возможность, которую я хочу рассмотреть,на самом деле вы можете нарушить максимальный размер пакета, но затем обработать исключение, повторить операцию отправки и скорректировать будущие вычисления на основе фактического измеренного размера сообщений с ошибками.Размер известен после попытки SendBatch, даже если операция не удалась, поэтому мы можем использовать эту информацию.
// Sender is reused across requests
public class BatchSender
{
private readonly QueueClient queueClient;
private long batchSizeLimit = 262000;
private long headerSizeEstimate = 54; // start with the smallest header possible
public BatchSender(QueueClient queueClient)
{
this.queueClient = queueClient;
}
public async Task SendBigBatchAsync<T>(IEnumerable<T> messages)
{
var packets = (from m in messages
let bm = new BrokeredMessage(m)
select new { Source = m, Brokered = bm, BodySize = bm.Size }).ToList();
var chunks = packets.ChunkBy(p => this.headerSizeEstimate + p.Brokered.Size, this.batchSizeLimit);
foreach (var chunk in chunks)
{
try
{
await this.queueClient.SendBatchAsync(chunk.Select(p => p.Brokered));
}
catch (MessageSizeExceededException)
{
var maxHeader = packets.Max(p => p.Brokered.Size - p.BodySize);
if (maxHeader > this.headerSizeEstimate)
{
// If failed messages had bigger headers, remember this header size
// as max observed and use it in future calculations
this.headerSizeEstimate = maxHeader;
}
else
{
// Reduce max batch size to 95% of current value
this.batchSizeLimit = (long)(this.batchSizeLimit * .95);
}
// Re-send the failed chunk
await this.SendBigBatchAsync(packets.Select(p => p.Source));
}
}
}
}
Вы можете использовать этот блог для дальнейшей ссылки .Надеюсь, это поможет.