Идемпотентность при загрузке заданий BigQuery с использованием Google.Cloud.BigQuery.V2 - PullRequest
0 голосов
/ 14 мая 2018

Вы можете создать задание загрузки csv для загрузки данных из файла csv в Google Cloud Storage, используя BigQueryClient в Google.Cloud.BigQuery.V2, который имеет метод CreateLoadJob.

Как вы можетегарантируйте идемпотентность с этим API, чтобы убедиться, что, скажем, сеть прервалась до получения ответа, и вы начали повторную попытку, и вы не получите многократные загрузки одних и тех же данных в BigQuery?

Пример использования API

    private void LoadCsv(string sourceUri, string tableId, string timePartitionField)
    {
        var tableReference = new TableReference()
        {
            DatasetId = _dataSetId,
            ProjectId = _projectId,
            TableId = tableId
        };

        var options = new CreateLoadJobOptions
        {
            WriteDisposition = WriteDisposition.WriteAppend,
            CreateDisposition = CreateDisposition.CreateNever,
            SkipLeadingRows = 1,
            SourceFormat = FileFormat.Csv,
            TimePartitioning = new TimePartitioning
            {
                Type = _partitionByDayType,
                Field = timePartitionField
            }
        };

        BigQueryJob loadJob = _bigQueryClient.CreateLoadJob(sourceUri: sourceUri,
                                                            destination: tableReference,
                                                            schema: null,
                                                            options: options);

        loadJob.PollUntilCompletedAsync().Wait();
        if (loadJob.Status.Errors == null || !loadJob.Status.Errors.Any())
        {
            //Log success
            return;
        }
        //Log error
    }

Ответы [ 2 ]

0 голосов
/ 14 мая 2018

Вы можете достичь идемпотентности, сгенерировав свой собственный идентификатор работы, основываясь, например, на расположении файла, который вы загрузили, и на целевой таблице.

job_id = 'my_load_job_{}'.format(hashlib.md5(sourceUri+_projectId+_datasetId+tableId).hexdigest())
var options = new CreateLoadJobOptions
        {
            WriteDisposition = WriteDisposition.WriteAppend,
            CreateDisposition = CreateDisposition.CreateNever,
            SkipLeadingRows = 1,
            JobId = job_id, #add this
            SourceFormat = FileFormat.Csv,
            TimePartitioning = new TimePartitioning
            {
                Type = _partitionByDayType,
                Field = timePartitionField
            }
        };

В этом случае, если вы попытаетесь повторно вставить тот же job_id, вы получили ошибку.Вы также можете легко сгенерировать этот job_id для проверки в случае сбоя пула.

0 голосов
/ 14 мая 2018

Есть два места, где вы можете потерять ответ:

  • При создании задания начинать с
  • При опросе на завершение

Первый способ относительно сложен для восстановления без идентификатора работы; Вы могли бы перечислить все вакансии в проекте и попытаться найти ту, которая выглядит как та, которую вы в противном случае создали бы.

Однако клиентская библиотека C # генерирует идентификатор задания, чтобы он мог повторить попытку, или вы можете указать свой собственный идентификатор задания через CreateLoadJobOptions.

Время второго сбоя намного проще: оставьте возвращенное значение BigQueryJob, чтобы в случае сбоя можно было повторить опрос. (Вы можете сохранить имя задания, чтобы его можно было восстановить, даже если ваш процесс умирает, например, в ожидании его завершения.)

...