Мне удалось создать простую облачную функцию, которая запускает задание Dataproc в GCS событие создания файла. В этом примере файл в GCS содержит запрос Pig для выполнения. Однако вы можете следовать документации Dataproc API для создания версии PySpark.
index.js
:
exports.submitJob = (event, callback) => {
const google = require('googleapis');
const projectId = 'my-project'
const clusterName = 'my-cluster'
const file = event.data;
if (file.name) {
google.auth.getApplicationDefault(function (err, authClient, projectId) {
if (err) {
throw err;
}
const queryFileUri = "gs://" + file.bucket + "/" + file.name
console.log("Using queryFileUri: ", queryFileUri);
if (authClient.createScopedRequired && authClient.createScopedRequired()) {
authClient = authClient.createScoped([
'https://www.googleapis.com/auth/cloud-platform',
'https://www.googleapis.com/auth/userinfo.email'
]);
}
const dataproc = google.dataproc({ version: 'v1beta2', auth: authClient });
dataproc.projects.regions.jobs.submit({
projectId: projectId,
region: "global",
resource: {
"job": {
"placement": {"clusterName": clusterName},
"pigJob": {
"queryFileUri": queryFileUri,
}
}
}
}, function(err, response) {
if (err) {
console.error("Error submitting job: ", err);
}
console.log("Dataproc response: ", response);
callback();
});
});
} else {
throw "Skipped processing file!";
}
callback();
};
Убедитесь, что для Function to execute
установлено значение submitJob
.
package.json
:
{
"name": "sample-cloud-storage",
"version": "0.0.1",
"dependencies":{ "googleapis": "^21.3.0" }
}
Следующий пост в блоге дал мне много идей, как начать:
https://cloud.google.com/blog/big-data/2016/04/scheduling-dataflow-pipelines-using-app-engine-cron-service-or-cloud-functions