Я пытаюсь активировать конвейер данных, основываясь на существовании файлов *.tar
в S3. Я создал функцию Lambda и написал код Python Boto 3 для активации конвейера данных. Я протестировал функцию Lambda и обнаружил, что она работает, когда существует файл .tar
, когда конвейер данных активирован, если не существует, конвейер данных не активирован.
Я пытаюсь понять причину этих проблем:
- Если в папке s3 нет файлов tar,
print ("datapipeline not activated")
не печатается в журналах.
Если я прерываю конвейер данных в предыдущем запуске и он помечается как завершенный до завершения конвейера данных, то снова запускаю лямбда-функцию. Я получаю приведенную ниже ошибку.
ОШИБКА: поле 'maxActiveInstances' может быть установлено только для объекта по умолчанию для конвейеров по требованию
Когда я пытался установить maxActiveInstances для ресурса EMR в конвейере данных,
{
"errorMessage": "Произошла ошибка (InvalidRequestException) при вызове операции ActivatePipeline: превышен лимит веб-службы: превышено число одновременных выполнений. Установите в поле maxActiveInstances более высокое значение в конвейере или дождитесь, пока выполняющиеся в текущий момент выполнения будут равны завершить, прежде чем пытаться снова ",
"errorType": "InvalidRequestException",
"трассировки стека": [
[
"/Var/task/lambda_function.py",
21,
"Lambda_handler",
"activ = client.activate_pipeline (pipelineId = data_pipeline_id, parameterValues = [])"
],
[
"/Var/runtime/botocore/client.py",
314,
"_Api_call",
"вернуть self._make_api_call (имя_операции, kwargs)"
],
[
"/Var/runtime/botocore/client.py",
612,
"_Make_api_call",
"повысить error_class (parsed_response, operation_name)"
]
]
}
Это скрипт Python, пожалуйста, предоставьте руководство по решению этих проблем.
import boto3
import logging
logger = logging.getLogger()
def lambda_handler(event, context):
client = boto3.client('datapipeline')
s3_client = boto3.client('s3')
#client = boto3.client('datapipeline')
data_pipeline_id="df-xxxxxxxx"
bucket = 'xxxxx'
prefix = 'xxxx/xxxxx/'
paginator = s3_client.get_paginator('list_objects_v2')
response_iterator = paginator.paginate(Bucket=bucket, Prefix=prefix)
response_pipeline = client.describe_pipelines(pipelineIds=[data_pipeline_id])
for response in response_iterator:
for object_data in response['Contents']:
key = object_data['Key']
#print (key)
if key.endswith('.tar'):
if(response_pipeline):
activate = client.activate_pipeline(pipelineId=data_pipeline_id,parameterValues=[])
print ("activated")
else:
print ("datapipeline not activated")