Как обмениваться данными в `AWS Step Functions`, не передавая их между шагами - PullRequest
0 голосов
/ 23 февраля 2019

Я использую AWS Step Functions и имею следующий рабочий процесс

AWS Step Functions workflow

initStep - Это обработчик лямбда-функции, который получаетнекоторые данные и отправляет их на SQS для внешней службы.

activity = os.getenv('ACTIVITY')
queue_name = os.getenv('QUEUE_NAME')

def lambda_handler(event, context):
  event['my_activity'] = activity
  data = json.dumps(event)

  # Retrieving a queue by its name
  sqs = boto3.resource('sqs')
  queue = sqs.get_queue_by_name(QueueName=queue_name)

  queue.send_message(MessageBody=data, MessageGroupId='messageGroup1' + str(datetime.time(datetime.now())))

  return event

validationWaiting - Это activity, который ожидает ответа от внешней службы, включающей данные.

завершено - это лямбда-обработчик функций, который использует данные из initStep.

def lambda_handler(event, context):
  email = event['email'] if 'email' in event else None
  data = event['data'] if 'data' in event else None

  client = boto3.client(service_name='ses')
  to = email.split(', ')
  message_conrainer = {'Subject': {'Data': 'Email from step functions'},
           'Body': {'Html': {
               'Charset': "UTF-8",
               'Data': """<html><body>
                            <p>""" + data """</p>
                            </body> </html> """
           }}}

  destination = {'ToAddresses': to,
               'CcAddresses': [],
               'BccAddresses': []}

  return client.send_email(Source=from_addresses,
                         Destination=destination,
                         Message=message_container)

Это работает, но проблема в том, что яотправка полных данных из initStep во внешнюю службу, для последующей передачи в complete.Потенциально можно добавить больше шагов.

Я полагаю, что было бы лучше поделиться ими как некими глобальными данными (текущей функции шага), чтобы я мог добавлять или удалять шаги, и данные все еще были бы доступны длявсе.

Ответы [ 3 ]

0 голосов
/ 26 февраля 2019

Здесь короткое и простое решение с InputPath и ResultPath.Мои лямбда-проверки Check_Ubuntu_Updates возвращают список экземпляров, готовых к обновлению.Этот список экземпляров получен на шаге Notify_Results, затем он использует эти данные.Помните, что если у вас есть несколько ResultPath в вашей функции шага и вам нужно более 1 ввода в шаге, вы можете использовать InputPath только с $.

{ "Comment": "A state machine that check some updates systems available.", "StartAt": "Check_Ubuntu_Updates", "States": { "Check_Ubuntu_Updates": { "Type": "Task", "Resource": "arn:aws:lambda:us-east-1:#############:function:Check_Ubuntu_Updates", "ResultPath": "$.instances", "Next": "Notify_Results" }, "Notify_Results": { "Type": "Task", "InputPath": "$.instances", "Resource": "arn:aws:lambda:us-east-1:#############:function:Notify_Results", "End": true } } }

0 голосов
/ 28 февраля 2019

На основании ответа Марчин Сучарски Я придумал собственное решение.

Мне нужно было использовать Type: Task, поскольку initStep - это лямбда, которая отправляет SQS.

Мне не нужно было InputPath в ValidationWaiting, а только ResultPath, в котором хранятся данные, полученные в действии.

Я работаю с Без сервера рамки, вот мое окончательное решение:

StartAt: initStep
States: 
  initStep:
    Type: Task
    Resource: arn:aws:lambda:#{AWS::Region}:#{AWS::AccountId}:function:init-step
    Next: ValidationWaiting
  ValidationWaiting:
    Type: Task
    ResultPath: $.validationOutput
    Resource: arn:aws:states:#{AWS::Region}:#{AWS::AccountId}:activity:validationActivity
    Next: Complete
    Catch:
      - ErrorEquals:
        - States.ALL
      ResultPath: $.validationOutput
      Next: Complete
  Complete:
    Type: Task
    Resource: arn:aws:lambda:#{AWS::Region}:#{AWS::AccountId}:function:complete-step
    End: true
0 голосов
/ 24 февраля 2019

Вы можете использовать InputPath и ResultPathinitStep вы бы отправляли только необходимые данные во внешнюю службу (возможно, вместе с некоторым уникальным идентификатором выполнения).На шаге ValidaitonWaiting вы можете установить следующие свойства (в определении конечного автомата):

  • InputPath: Какие данные будут предоставлены для GetActivityTask.Возможно, вы хотите установить что-то вроде $.execution_unique_id, где execution_unique_id - это поле в ваших данных, которое внешняя служба использует для идентификации выполнения (чтобы сопоставить его с конкретным запросом во время initStep).
  • ResultPath: Где выходные данные ValidationWaiting Activity будут сохранены в данных.Вы можете установить его на $.validation_output, и там будет присутствовать результат json от внешней службы.

Таким образом, вы можете отправлять во внешнюю службу только те данные, которые действительно ей нужны, и вы не потеряетедоступ к любым данным, которые были ранее (до шага ValidationWaiting) во входных данных.

Например, у вас может быть следующее определение конечного автомата:

{
  "StartAt": "initStep",
  "States": {
    "initStep": {
      "Type": "Pass",
      "Result": {
        "executionId": "some:special:id",
        "data": {},
        "someOtherData": {"value": "key"}
      },
      "Next": "ValidationWaiting"
    },
    "ValidationWaiting": {
      "Type": "Pass",
      "InputPath": "$.executionId",
      "ResultPath": "$.validationOutput",
      "Result": {
        "validationMessages": ["a", "b"]
      },
      "Next": "Complete"
    },
    "Complete": {
      "Type": "Pass",
      "End": true
    }
  }
}

Я использовалPass указывает для initStep и ValidationWaiting для упрощения примера (я не запускал его, но он должен работать).Поле Result относится к задаче Pass и эквивалентно результату ваших лямбда-функций или действий.

В этом сценарии шаг Complete будет содержать следующие данные:

{
  "executionId": "some:special:id",
  "data": {},
  "someOtherData": {"value": key"},
  "validationOutput": {
    "validationMessages": ["a", "b"]
  }
}

Таким образом, результат шага ValidationWaiting был сохранен в поле validationOutput.

...