Есть ли способ получить входные значения пошаговых функций в Args шага EMR - PullRequest
1 голос
/ 09 января 2020

Мы выполняем пакетные задания зажигания с использованием AWS EMR кластеров. Эти задания выполняются периодически, и мы хотели бы организовать их с помощью AWS Step Functions.

По состоянию на ноябрь 2019 г. Step Functions изначально поддерживает EMR. При добавлении шага в кластер мы можем использовать следующую конфигурацию:

"Some Step": {
    "Type": "Task",
    "Resource": "arn:aws:states:::elasticmapreduce:addStep.sync",
    "Parameters": {
        "ClusterId.$": "$.cluster.ClusterId",
        "Step": {
            "Name": "FirstStep",
            "ActionOnFailure": "CONTINUE",
            "HadoopJarStep": {
                "Jar": "command-runner.jar",
                "Args": [
                    "spark-submit",
                    "--class",
                    "com.some.package.Class",
                    "JarUri",
                    "--startDate",
                    "$.time",
                    "--daysToLookBack",
                    "$.daysToLookBack"
                ]
             }
         }
     },
     "Retry" : [
         {
             "ErrorEquals": [ "States.ALL" ],
             "IntervalSeconds": 1,
             "MaxAttempts": 1,
             "BackoffRate": 2.0
         }
     ],
     "ResultPath": "$.firstStep",
     "End": true
}

В списке аргументов HadoopJarStep мы хотели бы установить аргументы динамически. например, если вход выполнения конечного автомата:

{
    "time": "2020-01-08",
    "daysToLookBack": 2
}

Строки в конфигурации, начинающиеся с "$." должны быть соответственно заменены при выполнении конечного автомата, и шаг в кластере EMR должен запускаться command-runner.jar spark-submit --class com.some.package.Class JarUri --startDate 2020-01-08 --daysToLookBack 2 Но вместо этого он запускает command-runner.jar spark-submit --class com.some.package.Class JarUri --startDate $.time --daysToLookBack $.daysToLookBack.

Кто-нибудь знает, есть ли способ сделать это?

1 Ответ

2 голосов
/ 10 января 2020

Параметры позволяют вам определять пары ключ-значение, так как значение для ключа «Args» является массивом, вы не сможете динамически ссылаться на конкретный элемент c в массиве, вам потребуется вместо этого ссылаться на весь массив. Например, «Args. $»: «$ .Input.ArgsArray».

Так что для вашего варианта использования лучшим способом для достижения этой цели было бы добавление состояния предварительной обработки перед вызовом этого состояния. В состоянии предварительной обработки вы можете либо вызвать функцию Lambda и отформатировать ваш ввод / вывод с помощью кода, либо просто добавить значение динамического c в массив, для которого можно использовать Pass State для переформатируйте данные и затем в параметрах состояния вашей задачи вы можете использовать JSONPath , чтобы получить массив, который вы определили в препроцессоре. Вот пример:

{
"Comment": "A Hello World example of the Amazon States Language using Pass states",
"StartAt": "HardCodedInputs",
"States": {
    "HardCodedInputs": {
        "Type": "Pass",
        "Parameters": {
            "cluster": {
                "ClusterId": "ValueForClusterIdVariable"
            },
            "time": "ValueForTimeVariable",
            "daysToLookBack": "ValueFordaysToLookBackVariable"
        },
        "Next": "Pre-Process"
    },
    "Pre-Process": {
        "Type": "Pass",
        "Parameters": {
            "FormattedInputsForEmr": {
                "ClusterId.$": "$.cluster.ClusterId",
                "Args": [
                    {
                        "Arg1": "spark-submit"
                    },
                    {
                        "Arg2": "--class"
                    },
                    {
                        "Arg3": "com.some.package.Class"
                    },
                    {
                        "Arg4": "JarUri"
                    },
                    {
                        "Arg5": "--startDate"
                    },
                    {
                        "Arg6.$": "$.time"
                    },
                    {
                        "Arg7": "--daysToLookBack"
                    },
                    {
                        "Arg8.$": "$.daysToLookBack"
                    }
                ]
            }
        },
        "Next": "Some Step"
    },
    "Some Step": {
        "Type": "Pass",
        "Parameters": {
            "ClusterId.$": "$.FormattedInputsForEmr.ClusterId",
            "Step": {
                "Name": "FirstStep",
                "ActionOnFailure": "CONTINUE",
                "HadoopJarStep": {
                    "Jar": "command-runner.jar",
                    "Args.$": "$.FormattedInputsForEmr.Args[*][*]"
                }
            }
        },
        "End": true
    }
  }
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...