У меня была такая же проблема, я создал GAE, который запускает задание DataFlow.в любое время с другими параметрами, в моем случае, каждый раз, когда я отправляю другой диапазон дат.И затем я планирую это с работой CRON.
from googleapiclient.discovery import build
from oauth2client.client import GoogleCredentials
credentials = GoogleCredentials.get_application_default()
service = build('dataflow', 'v1b3', credentials=credentials)
# Set the following variables to your values.
JOBNAME = '[JOB_NAME]'
PROJECT = '[YOUR_PROJECT_ID]'
BUCKET = '[YOUR_BUCKET_NAME]'
TEMPLATE = '[YOUR_TEMPLATE_NAME]'
GCSPATH="gs://{bucket}/templates/{template}".format(bucket=BUCKET, template=TEMPLATE)
BODY = {
"jobName": "{jobname}".format(jobname=JOBNAME),
"parameters": {
"inputFile" : "gs://{bucket}/input/my_input.txt",
"outputFile": "gs://{bucket}/output/my_output".format(bucket=BUCKET)
},
"environment": {
"tempLocation": "gs://{bucket}/temp".format(bucket=BUCKET),
"zone": "us-central1-f"
}
}
request = service.projects().templates().launch(projectId=PROJECT, gcsPath=GCSPATH, body=BODY)
response = request.execute()
print(response)
app.yaml
runtime: python37
service: run-data-flow
cron.yaml
cron:
- description: "copy history daily Job"
url: /run
target: run-data-flow
schedule: every 1 hours from 9:00 to 23:00
retry_parameters:
min_backoff_seconds: 3
max_doublings: 5