Я пытаюсь автоматизировать некоторые задачи по очистке данных, загружая файлы в облачное хранилище, запуская их по конвейеру и загружая результаты.
Я создал шаблон для выполнения моего конвейера с использованиемГрафический интерфейс в Dataprep, и я пытаюсь автоматизировать загрузку и выполнение шаблона с помощью клиентских библиотек Google, в частности, на Python.
Однако я обнаружил, что при выполнении задания со скриптом Python полный шаблонне выполняется;иногда некоторые этапы не выполняются, иногда выходной файл, размер которого должен быть мегабайтом, составляет менее 500 байт.Это зависит от шаблона, который я использую.У каждого шаблона есть своя собственная проблема.
Я пытался разбить большой шаблон на меньшие шаблоны, чтобы последовательно применять, чтобы я мог видеть, в чем проблема, но именно здесь я обнаружил, что у каждого шаблона есть своя собственная проблема.Я также попытался создать задание из интерфейса мониторинга потока данных и обнаружил, что все, что создано с этим, будет работать отлично, что означает, что должна быть некоторая проблема с созданным мной сценарием.
def runJob(bucket, template, fileName):
#open connection with the needed credentials
credentials = GoogleCredentials.get_application_default()
service = build('dataflow', 'v1b3', credentials = credentials)
#name job after file being processed
jobName = fileName.replace('.csv', '')
projectId = 'my-project'
#find the template to run on the dataset
templatePath = "gs://{bucket}/me@myemail.com/temp/{template}".format(bucket = bucket, template=template)
#construct job JSON
body = {
"jobName":"{jobName}".format(jobName=jobName),
"parameters" : {
"inputLocations" :"{\"location1\":\"gs://" + bucket + "/me@myemail.com/RawUpload/" + fileName + "\"}",
"outputLocations":"{\"location1\":\"gs://" + bucket + "/me@myemail.com/CleanData/" + fileName.replace('.csv', '_auto_delete_2') + "\"}",
},
"environment" : {
"tempLocation":"gs://{bucket}/me@myemail.com/temp".format(bucket = bucket),
"zone":"us-central1-f"
}
}
#create and execute HTTPRequest
request = service.projects().templates().launch(projectId=projectId, gcsPath=templatePath, body=body)
response = request.execute()
#notify user
print(response)
Использованиев формате JSON мой ввод параметров такой же, как при использовании интерфейса мониторинга.Это говорит мне о том, что либо что-то происходит в фоновом режиме интерфейса мониторинга, о котором я не знаю, и поэтому не включаю в него, или есть проблема с кодом, который я создал.
Как я уже сказалвыше, проблема варьируется в зависимости от шаблона, который я пытаюсь запустить, но наиболее распространенным является чрезвычайно маленький выходной файл.Выходной файл будет на величины меньше, чем должно быть.Это связано с тем, что он будет содержать только заголовки CSV и некоторые случайные выборки первой строки в данных, а также неправильно отформатирован в первую очередь для файла CSV.
Кто-нибудь знает, что мне не хватает, или понимает, что я делаю неправильно?