Как вызвать работу Google Dataproc из облачной функции Google - PullRequest
0 голосов
/ 09 мая 2018

Запускать функцию облака каждый раз, когда новый файл загружается в корзину облачного хранилища. Эта функция должна вызвать задание dataproc, написанное в pyspark, чтобы прочитать файл и загрузить его в BigQuery.

Я хочу знать, как вызвать задание Google Dataproc из облачной функции. Пожалуйста, предложите.

1 Ответ

0 голосов
/ 15 мая 2018

Мне удалось создать простую облачную функцию, которая запускает задание Dataproc в GCS событие создания файла. В этом примере файл в GCS содержит запрос Pig для выполнения. Однако вы можете следовать документации Dataproc API для создания версии PySpark.

index.js:

exports.submitJob = (event, callback) => {

  const google = require('googleapis');

  const projectId = 'my-project'
  const clusterName = 'my-cluster'

  const file = event.data;
  if (file.name) {
    google.auth.getApplicationDefault(function (err, authClient, projectId) {
      if (err) {
        throw err;
      }

      const queryFileUri = "gs://" + file.bucket + "/" + file.name
      console.log("Using queryFileUri: ", queryFileUri);

      if (authClient.createScopedRequired && authClient.createScopedRequired()) {
        authClient = authClient.createScoped([
          'https://www.googleapis.com/auth/cloud-platform',
          'https://www.googleapis.com/auth/userinfo.email'
        ]);
      }

      const dataproc = google.dataproc({ version: 'v1beta2', auth: authClient });

      dataproc.projects.regions.jobs.submit({
          projectId: projectId,
          region: "global",
          resource: {
            "job": {
              "placement": {"clusterName": clusterName},
              "pigJob": {
                "queryFileUri": queryFileUri,
              }
            }
          }
        }, function(err, response) {
          if (err) {
            console.error("Error submitting job: ", err);
          }
          console.log("Dataproc response: ", response);
          callback();
        });

    });
  } else {
    throw "Skipped processing file!";
  }

  callback();
};

Убедитесь, что для Function to execute установлено значение submitJob.

package.json:

{
  "name": "sample-cloud-storage",
  "version": "0.0.1",
  "dependencies":{ "googleapis": "^21.3.0" }
}

Следующий пост в блоге дал мне много идей, как начать: https://cloud.google.com/blog/big-data/2016/04/scheduling-dataflow-pipelines-using-app-engine-cron-service-or-cloud-functions

...