Избегайте ожидания веб-задания при загрузке небольшой группы данных в обозреватель данных Azure. - PullRequest
0 голосов
/ 11 октября 2019

У меня есть веб-джоб, который получает события щелчка на сайте от концентратора событий Azure, а затем загружает эти события в ADX.

public static async Task Run([EventHubTrigger] EventData[] events, ILogger logger)
{
    // Process events
    try
    {
        var ingestResult = await _adxIngester.IngestAsync(events);
        if (!ingestResult) 
        {
            AppInsightLogError();
            logger.LogError();
        }

    }
    catch(Exception ex)
    {
        AppInsighLogError();
        logger.LogError()
    }
}

Я использовал прием в очередь и отключил FlushImmediately при входе в ADX, что позволяет использовать пакетный прием. Когда события не соответствуют принятой по умолчанию политике IngestionBatch в размере 1000 событий / 1 ГБ данных, ADX ждет 5 минут , пока не вернется Success статус, что означает Run также в течение этого времени.

public async Task<bool> IngestAsync(...) 
{
    IKustoQueuedIngestClient client = KustoIngestFactory.CreateQueuedIngestClient(kustoConnectionString);

    var kustoIngestionProperties = new KustoQueuedIngestionProperties(databaseName: "myDB", tableName: "events")
    {
        ReportLevel = IngestionReportLevel.FailuresOnly,
        ReportMethod = IngestionReportMethod.Table,
        FlushImmediately = false
    };

    var streamIdentifier = Guid.NewGuid();
    var clientResult = await client.IngestFromStreamAsync(...);
    var ingestionStatus = clientResult.GetIngestionStatusBySourceId(streamIdentifier);

    while (ingestionStatus.Status == Status.Pending)
    {
        await Task.Delay(TimeSpan.FromSeconds(15));
        ingestionStatus = clientResult.GetIngestionStatusBySourceId(streamIdentifier);
    }

    if (ingestionStatus.Status == Status.Failed) 
    {
        return false;
    }

    return true;
}

Поскольку я не хочу, чтобы моя веб-работа ждала так долго, когда не так много событий или просто QA работает, я внес следующие изменения:

  • Дон't await on IngestAsync, таким образом сделайте Run синхронным методом
  • Добавьте параметр Action onError в IngestAsync и вызовите его, если задача загрузки не удалась. Звоните AppInsightLogError() и logger.LogError() внутри onError вместо возврата false
  • Замените IngestFromStreamAsync на IngestFromStream

В принципе, я хочу убедиться, что события достигают Azure Queue и выдает исключение (если есть), прежде чем я запрашиваю статус загрузки, затем выйдите из метода Run, и мне не придется ждать опроса статуса, если что-то не получится, то это будет журнал.

Мой вопрос:

  • Является ли хорошей практикой избегать минут веб-работы? Если нет, то почему?
  • Если да, достаточно ли мое решение для этой проблемы? Иначе как мне это сделать?

Ответы [ 3 ]

0 голосов
/ 16 октября 2019
  • IngestFromStreamAsync загрузите данные в большой двоичный объект и опубликуйте сообщение во входной очереди управления данными. Он не будет ждать времени агрегации, и вы получите окончательное состояние В очереди .
  • FlushImmediately по умолчанию имеет значение false.
  • Если нет никакой дополнительной обработки, рассмотрите возможность использования Event Hub для подключения к Kusto

[Отредактировано] в ответ на комментарии:

  • Состояние в очереди означает, что большой объект ожидает приема. Вы можете отслеживать статус с помощью команды show ingestion failures , метрики и журналы приема .
  • По умолчанию подключение к концентратору событий проходит через очереди. Он будет использовать потоковую загрузку, только если он установлен в качестве политики для базы данных / таблицы
  • Часть обработки может быть выполнена в ADX с использованием сопоставления загрузки и политики обновления.
0 голосов
/ 22 октября 2019

Если вы принимаете небольшие партии данных и хотите сократить время дозирования, прочитайте следующую статью: https://docs.microsoft.com/en-us/azure/kusto/concepts/batchingpolicy Политика пакетирования при проглатывании позволяет контролировать пределы дозирования для базы данных или таблицы.

0 голосов
/ 16 октября 2019

Прием внутрь происходит в несколько этапов. Один этап выполняется на стороне клиента, а другой этап - на стороне сервера:

  1. Используемый вами код клиента для загрузки собирается в ваш поток и загружает его в большой двоичный объект, итогда он отправит сообщение в очередь. Любые исключения, возникшие на этом этапе, действительно будут распространяться на ваш код, поэтому вам также следует использовать некоторый блок try-catch, где в блоке catch вы можете записать сообщение об ошибке, как вы предлагали. Вы можете использовать IngestFromStreamAsync с ключевым словом await или использовать IngestFromStream. Первый вариант лучше, если вы хотите освободить рабочий поток и сэкономить ресурсы. Но выбор между этими двумя не имеет ничего общего с опросом. Опрос относится ко второму этапу.
  2. Компонент DataManagement в Kusto постоянно прослушивает сообщения в очереди, поэтому, как только он доберется до вашего нового сообщения, он прочтет его и увидит некоторую информацию метаданных о новомзапрос на загрузку, такой как URI большого двоичного объекта, в котором хранятся данные, и, например, таблица Azure, в которой ошибки / ход выполнения должны обновлятьсяЭта фаза выполняется удаленно на стороне сервера, и у вас есть возможность ждать в своем клиентском коде для каждого отдельного приема и опроса, пока сервер не завершит процесс приема. Если на этом этапе есть какие-либо исключения, то, конечно, они не будут распространены на ваш клиентский код, но вы сможете изучить таблицу Azure и посмотреть, что произошло. Вы также можете отложить проверку состояния и выполнить ее в другом задании.
...