Я установил экземпляр Event Hub с 20 единицами пропускной способности и 32 разделами на стандартном уровне. Согласно документации, каждая единица пропускной способности равна 1 МБ / с. Поэтому в идеале я должен получать пропускную способность 20 МБ / с или 1,2 ГБ / мин. Пространство имен имеет только один концентратор событий, и я единственный пользователь. Концентратор событий установлен в западной части США, и этот вариант наиболее близок к тому, откуда отправляются запросы.
Тем не менее, я вижу, что требуется не менее 10 минут для 1,77 ГБ данных. Я использую асинхронные пакетные вызовы и упаковываю каждый запрос до предела 1 МБ. Я вижу огромную разницу во времени, потребляемом вызовом SendBatchAsync - он варьируется от 0,15 до 25 секунд.
Вот мой код:
(Обратите внимание: я вынужден использовать .Net Framework 4.5)
static EventHubClient eventHubClient;
static Dictionary<int, List<EventData>> events = new Dictionary<int, List<EventData>>();
static Dictionary<int, long> batchSizes = new Dictionary<int, long>();
static long threshold = (long)(1e6 - 1000);
static SemaphoreSlim concurrencySemaphore;
static int maxConcurrency = 1;
static void Main()
{
eventHubClient = EventHubClient.CreateFromConnectionString(connectionString, eventHubName);
Stopwatch stopWatch = new Stopwatch();
stopWatch.Start();
using (concurrencySemaphore = new SemaphoreSlim(maxConcurrency))
{
foreach (string record in GetRecords())
{
Tuple<int, EventData> currentEventDetails = GetEventData(record);
int partitionId = currentEventDetails.Item1;
EventData currentEvent = currentEventDetails.Item2;
BatchOrSendAsync(partitionId, currentEvent);
}
SendRemainingAsync();
}
stopWatch.Stop();
Console.WriteLine(string.Format("### total time taken = {0}", stopWatch.Elapsed.TotalSeconds.ToString()));
}
static async void BatchOrSendAsync(int partitionId, EventData currentEvent)
{
long batchSize = 0;
batchSizes.TryGetValue(partitionId, out batchSize);
long currentEventSize = currentEvent.SerializedSizeInBytes;
if( batchSize + currentEventSize > threshold)
{
List<EventData> eventsToSend = events[partitionId];
if (eventsToSend == null || eventsToSend.Count == 0)
{
if (currentEventSize > threshold)
throw new Exception("found event with size above threshold");
return;
}
concurrencySemaphore.Wait();
Stopwatch stopWatch = new Stopwatch();
stopWatch.Start();
await eventHubClient.SendBatchAsync(eventsToSend);
stopWatch.Stop();
Console.WriteLine(stopWatch.Elapsed.TotalSeconds.ToString());
concurrencySemaphore.Release();
events[partitionId] = new List<EventData> { currentEvent };
batchSizes[partitionId] = currentEventSize;
}
else
{
if (!events.ContainsKey(partitionId))
{
events[partitionId] = new List<EventData>();
batchSizes[partitionId] = 0;
}
events[partitionId].Add(currentEvent);
batchSizes[partitionId] += currentEventSize;
}
}
static async void SendRemainingAsync()
{
foreach(int partitionId in events.Keys)
{
concurrencySemaphore.Wait();
Stopwatch stopWatch = new Stopwatch();
stopWatch.Start();
await eventHubClient.SendBatchAsync(events[partitionId]);
stopWatch.Stop();
Console.WriteLine(stopWatch.Elapsed.TotalSeconds.ToString());
concurrencySemaphore.Release();
}
}
Примечание: увеличение maxConcurrency для семафора ухудшает только общее время, и вызов SendBatchAsync начинает давать сбой, когда maxConcurrency равно 10
Что я должен сделать, чтобы улучшить пропускную способность?