Облачная функция для запуска задания потока данных DataPrep - PullRequest
0 голосов
/ 07 мая 2018

У меня есть небольшой конвейер, который я пытаюсь выполнить:

  1. , помещенный в GCS Bucket> 2. Облачная функция запускает задание потока данных, когда файл помещается в корзину GCS (не работает)> 3. Записывает в таблицу Big Query (эта часть работает)

Я создал задание Dataflow через Dataprep, поскольку он имеет приятный пользовательский интерфейс для выполнения всех моих преобразований перед записью в таблицу BigQuery (запись в BigQuery работает нормально), а функция Cloud запускается при загрузке файла в корзину GCS. , Однако функция Cloud не запускает задание Dataflow (которое я написал в Dataprep).

Пожалуйста, посмотрите на мой пример кода ниже моей облачной функции, если я могу получить какие-либо указатели на то, почему задание потока данных не запускается.

/**
 * Triggered from a message on a Cloud Storage bucket.
 *
 * @param {!Object} event The Cloud Functions event.
 * @param {!Function} The callback function.
 */
exports.processFile = (event, callback) => {
  console.log('Processing file: ' + event.data.name);
  callback();

  const google = require('googleapis');

 exports.CF_GCStoDataFlow_v2 = function(event, callback) {
  const file = event.data;
  if (file.resourceState === 'exists' && file.name) {
    google.auth.getApplicationDefault(function (err, authClient, projectId) {
      if (err) {
        throw err;
      }

      if (authClient.createScopedRequired && authClient.createScopedRequired()) {
        authClient = authClient.createScoped([
          'https://www.googleapis.com/auth/cloud-platform',
          'https://www.googleapis.com/auth/userinfo.email'
        ]);
      }

      const dataflow = google.dataflow({ version: 'v1b3', auth: authClient });

      dataflow.projects.templates.create({
        projectId: projectId,
        resource: {
          parameters: {
            inputFile: `gs://${file.bucket}/${file.name}`,
            outputFile: `gs://${file.bucket}/${file.name}`
          },
          jobName: 'cloud-dataprep-csvtobq-v2-281345',
          gcsPath: 'gs://mygcstest-pipeline-staging/temp/'
        }
      }, function(err, response) {
        if (err) {
          console.error("problem running dataflow template, error was: ", err);
        }
        console.log("Dataflow template response: ", response);
        callback();
      });

    });
  }
 };
};

DataProc job

Ответы [ 3 ]

0 голосов
/ 08 мая 2018

Похоже, вы помещаете CF_GCStoDataFlow_v2 в processFile, поэтому часть кода потока данных не выполняется.

Ваша функция должна выглядеть следующим образом:

/**
 * Triggered from a message on a Cloud Storage bucket.
 *
 * @param {!Object} event The Cloud Functions event.
 * @param {!Function} The callback function.
 */
exports.CF_GCStoDataFlow_v2 = (event, callback) => {

  const google = require('googleapis');

  if (file.resourceState === 'exists' && file.name) {
    google.auth.getApplicationDefault(function (err, authClient, projectId) {
      if (err) {
        throw err;
      }

      if (authClient.createScopedRequired && authClient.createScopedRequired()) {
        authClient = authClient.createScoped([
          'https://www.googleapis.com/auth/cloud-platform',
          'https://www.googleapis.com/auth/userinfo.email'
        ]);
      }

      const dataflow = google.dataflow({ version: 'v1b3', auth: authClient });

      dataflow.projects.templates.create({
        projectId: projectId,
        resource: {
          parameters: {
            inputFile: `gs://${file.bucket}/${file.name}`,
            outputFile: `gs://${file.bucket}/${file.name}`
          },
          jobName: '<JOB_NAME>',
          gcsPath: '<BUCKET_NAME>'
        }
      }, function(err, response) {
        if (err) {
          console.error("problem running dataflow template, error was: ", err);
        }
        console.log("Dataflow template response: ", response);
        callback();
      });

    });
  }

  callback();
};

Убедитесь, что вы изменили значение в «Функция для выполнения» на CF_GCStoDataFlow_v2

0 голосов
/ 06 июня 2018

Этот фрагмент может помочь, он использует другой метод API потока данных (запуск), он работал для меня, помните, что вам нужно указать URL шаблона, а также проверить файл метаданных (вы можете найти его в том же каталоге, что и шаблон, когда выполняется через интерфейс dataprep) файл, который вы включаете правильные параметры

dataflow.projects.templates.launch({
   projectId: projectId,
   location: location,
   gcsPath: jobTemplateUrl,
   resource: {
     parameters: {
       inputLocations : `{"location1" :"gs://${file.bucket}/${file.name}"}`,
       outputLocations: `{"location1" : "gs://${destination.bucket}/${destination.name}"}"}`,
     },
      environment: {
        tempLocation: `gs://${destination.bucket}/${destination.tempFolder}`,
        zone: "us-central1-f"
     },
     jobName: 'my-job-name',

   }
 }
0 голосов
/ 07 мая 2018

Вы отправили вам работу Dataproc? Он начал работать? Приведенная ниже документация может дать представление о том, с чего начать!

https://cloud.google.com/dataproc/docs/concepts/jobs/life-of-a-job

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...