Как избежать состояния гонки при обновлении записи хранилища таблиц Azure - PullRequest
2 голосов
/ 08 июня 2019

Функция Azure, использующая хранилище таблиц Azure

У меня есть функция Azure, которая запускается из тематической подписки Azure Service Bus, назовем ее функцией «Информация о файле процесса».

Сообщение в подписке содержит информацию о файле для обработки. Нечто похожее на это:

{
  "uniqueFileId": "adjsdakajksajkskjdasd",
  "fileName":"mydocument.docx",
  "sourceSystemRef":"System1",
  "sizeBytes": 1024,
  ... and other data
}

Функция выполняет следующие две операции -

  1. Проверка таблицы хранения отдельных файлов на предмет наличия файла. Если он существует, обновите этот файл. Если он новый, добавьте файл в таблицу хранения (хранится в для каждой системы | для каждого идентификатора файла ).

  2. Захват метрик в байтах размера файла и сохранение во второй таблице хранения, называемой метриками (с постоянным приращением байтов, хранящейся в на систему | в год / месяц базис).

Следующая диаграмма дает краткое изложение моего подхода:

enter image description here

Разница между таблицей individualFileInfo и fileMetric заключается в том, что в отдельной таблице имеется одна запись на файл, где в таблице метрик хранится одна запись в месяц, которая постоянно обновляется ( увеличивается) сбор общего количества байтов, которые передаются через функцию.

Данные в таблице fileMetrics хранятся следующим образом:

enter image description here

Проблема ...

Функции Azure великолепны при масштабировании, в моей настройке у меня есть максимум 6 из этих функций, работающих одновременно. Предполагая, что каждое обрабатываемое сообщение файла уникально - обновление записи (или вставка) в таблицу individualFileInfo работает нормально, так как нет условий гонки.

Однако обновление таблицы fileMetric оказывается проблематичным, так как, скажем, все 6 функций запускаются одновременно, все они намереваются обновить таблицу метрик за один раз (постоянно увеличивая счетчик нового файла или увеличивая существующий счетчик файлов).

Я попытался использовать etag для оптимистических обновлений, а также немного рекурсии для повторной попытки, если ответ 412 вернется из обновления хранилища (пример кода ниже). Но я не могу избежать этого состояния гонки. Кто-нибудь предлагал, как обойти это ограничение или сталкивался с чем-то подобным раньше?

Пример кода, который выполняется в функции для сохранения fileMetric update:

internal static async Task UpdateMetricEntry(IAzureTableStorageService auditTableService, 
    string sourceSystemReference, long addNewBytes, long addIncrementBytes, int retryDepth = 0)
{
    const int maxRetryDepth = 3; // only recurively attempt max 3 times
    var todayYearMonth = DateTime.Now.ToString("yyyyMM");
    try
    {
        // Attempt to get existing record from table storage.
        var result = await auditTableService.GetRecord<VolumeMetric>("VolumeMetrics", sourceSystemReference, todayYearMonth);

        // If the volume metrics table existing in storage - add or edit the records as required.
        if (result.TableExists)
        {
            VolumeMetric volumeMetric = result.RecordExists ?
                // Existing metric record.
                (VolumeMetric)result.Record.Clone()
                    :
                // Brand new metrics record.
                new VolumeMetric
                {
                    PartitionKey = sourceSystemReference,
                    RowKey = todayYearMonth,
                    SourceSystemReference = sourceSystemReference,
                    BillingMonth = DateTime.Now.Month,
                    BillingYear = DateTime.Now.Year,
                    ETag = "*"
                };

            volumeMetric.NewVolumeBytes += addNewBytes;
            volumeMetric.IncrementalVolumeBytes += addIncrementBytes;

            await auditTableService.InsertOrReplace("VolumeMetrics", volumeMetric);
        }
    }
    catch (StorageException ex)
    {
        if (ex.RequestInformation.HttpStatusCode == 412)
        {
            // Retry to update the volume metrics.
            if (retryDepth < maxRetryDepth)
                await UpdateMetricEntry(auditTableService, sourceSystemReference, addNewBytes, addIncrementBytes, retryDepth++);
        }
        else
            throw;
    }
}

Etag отслеживает конфликты, и если этот код получит 412 Http-ответ, он будет повторяться максимум до 3 раз (попытка смягчить проблему). Моя проблема здесь заключается в том, что я не могу гарантировать обновления в хранилище таблиц во всех экземплярах функции.

Спасибо за любые советы заранее!

1 Ответ

1 голос
/ 09 июня 2019

Вы можете поместить вторую часть работы во вторую очередь и функцию, возможно даже поставить триггер на обновления файла.

Поскольку другая операция звучит так, как будто она может занять большую часть времени, она также может удалить часть тепла со второго шага.

Затем вы можете решить любые оставшиеся условия гонки, сосредоточившись только на этой функции. Вы можете использовать сеансы для эффективного ограничения параллелизма. В вашем случае системный идентификатор может быть возможным ключом сеанса. Если вы используете это, у вас будет только одна функция Azure, обрабатывающая данные из одной системы одновременно, что эффективно решает ваши условия гонки.

https://dev.to/azure/ordered-queue-processing-in-azure-functions-4h6c

Изменить: Если вы не можете использовать сеансы для логической блокировки ресурса, вы можете использовать блокировки через хранилище больших двоичных объектов:

https://www.azurefromthetrenches.com/acquiring-locks-on-table-storage/

...