AWS Lambda для активации конвейера данных - PullRequest
0 голосов
/ 10 сентября 2018

Я пытаюсь активировать конвейер данных, основываясь на существовании файлов *.tar в S3. Я создал функцию Lambda и написал код Python Boto 3 для активации конвейера данных. Я протестировал функцию Lambda и обнаружил, что она работает, когда существует файл .tar, когда конвейер данных активирован, если не существует, конвейер данных не активирован.

Я пытаюсь понять причину этих проблем:

  1. Если в папке s3 нет файлов tar, print ("datapipeline not activated") не печатается в журналах.
  2. Если я прерываю конвейер данных в предыдущем запуске и он помечается как завершенный до завершения конвейера данных, то снова запускаю лямбда-функцию. Я получаю приведенную ниже ошибку.

    ОШИБКА: поле 'maxActiveInstances' может быть установлено только для объекта по умолчанию для конвейеров по требованию

  3. Когда я пытался установить maxActiveInstances для ресурса EMR в конвейере данных,

    { "errorMessage": "Произошла ошибка (InvalidRequestException) при вызове операции ActivatePipeline: превышен лимит веб-службы: превышено число одновременных выполнений. Установите в поле maxActiveInstances более высокое значение в конвейере или дождитесь, пока выполняющиеся в текущий момент выполнения будут равны завершить, прежде чем пытаться снова ", "errorType": "InvalidRequestException", "трассировки стека": [ [ "/Var/task/lambda_function.py", 21, "Lambda_handler", "activ = client.activate_pipeline (pipelineId = data_pipeline_id, parameterValues ​​= [])" ], [ "/Var/runtime/botocore/client.py", 314, "_Api_call", "вернуть self._make_api_call (имя_операции, kwargs)" ], [ "/Var/runtime/botocore/client.py", 612, "_Make_api_call", "повысить error_class (parsed_response, operation_name)" ] ] }

Это скрипт Python, пожалуйста, предоставьте руководство по решению этих проблем.

import boto3
import logging
logger = logging.getLogger()

def lambda_handler(event, context):
client = boto3.client('datapipeline')
s3_client = boto3.client('s3')
#client = boto3.client('datapipeline')
data_pipeline_id="df-xxxxxxxx"
bucket = 'xxxxx'
prefix = 'xxxx/xxxxx/'
paginator = s3_client.get_paginator('list_objects_v2')
response_iterator = paginator.paginate(Bucket=bucket, Prefix=prefix)
response_pipeline = client.describe_pipelines(pipelineIds=[data_pipeline_id])
for response in response_iterator:
for object_data in response['Contents']:
key = object_data['Key']
    #print (key)
if key.endswith('.tar'):
if(response_pipeline):
activate = client.activate_pipeline(pipelineId=data_pipeline_id,parameterValues=[])
print ("activated")
else:
print ("datapipeline not activated")

1 Ответ

0 голосов
/ 28 декабря 2018

Я думаю, что я только что видел те же симптомы, надеюсь, что поделиться нашим исправлением может вам помочь?

Мы отменили экземпляр конвейера, и нам нужно было повторно включить конвейер, чтобы обойти эту ошибку.

...