Запуск задания Spark AWS EMR из события S3 - PullRequest
0 голосов
/ 14 марта 2020

Я рассматриваю возможность использования AWS EMR Spark для запуска приложения Spark для очень больших файлов Parquet, хранящихся на S3. В целом процесс заключается в том, что процесс Java будет загружать эти большие файлы в S3, и я хотел бы автоматически запускать задание Spark (внедренное с помощью ключей (имен S3) загруженных файлов) в этих файлах. .

В идеале для подключения можно использовать некий триггер EMR на основе S3; то есть я настраиваю EMR / Spark для «прослушивания» сегмента S3 и запуска задания Spark при выполнении upsertis для этого сегмента.

Если такого триггера не существует, я, вероятно, мог бы что-то запутать, например, сбросить лямбду с события S3 и заставить лямбду как-то вызвать задание EMR Spark.

Однако мое понимание ( пожалуйста, поправьте меня, если я '' Это неправильно) единственный способ запустить задание Spark:

  1. Упаковать задание в виде исполняемого файла JAR; и
  2. Отправьте его в кластер (EMR или иным образом) с помощью spark-submit сценария оболочки

Так что, если мне нужно будет сделать lambda-kludge Я не совсем уверен, каков наилучший способ запуска задания EMR / Spark, поскольку Lambdas изначально не несет spark-submit во время выполнения. И даже если бы я настроил свою собственную среду выполнения Lambda (которую я считаю теперь можно сделать), это решение уже кажется действительно вялым и нетерпимым.

Кто-нибудь когда-либо запускает EMR / Spark задание от триггера S3 или любой AWS триггер до?

1 Ответ

1 голос
/ 14 марта 2020

Задание EMR Spark можно выполнить как шаг, как в Добавление шага Spark . Шаг не только во время создания кластера EMR после bootstrap.

aws emr add-steps --cluster-id j-2AXXXXXXGAPLF --steps Type=Spark,Name="Spark Program",ActionOnFailure=CONTINUE,Args=[--class,org.apache.spark.examples.SparkPi,/usr/lib/spark/examples/jars/spark-examples.jar,10]

Поскольку это CLI AWS, вы можете вызвать его из Lambda, в котором вы также можете загрузить файл JAR в HDFS. или S3, затем укажите его, используя s3: // или hdfs: //.

В документе также есть пример Java.

AWSCredentials credentials = new BasicAWSCredentials(accessKey, secretKey);
AmazonElasticMapReduce emr = new AmazonElasticMapReduceClient(credentials);

StepFactory stepFactory = new StepFactory();
AmazonElasticMapReduceClient emr = new AmazonElasticMapReduceClient(credentials);
AddJobFlowStepsRequest req = new AddJobFlowStepsRequest();
req.withJobFlowId("j-1K48XXXXXXHCB");

List<StepConfig> stepConfigs = new ArrayList<StepConfig>();

HadoopJarStepConfig sparkStepConf = new HadoopJarStepConfig()
            .withJar("command-runner.jar")
            .withArgs("spark-submit","--executor-memory","1g","--class","org.apache.spark.examples.SparkPi","/usr/lib/spark/examples/jars/spark-examples.jar","10");            

StepConfig sparkStep = new StepConfig()
            .withName("Spark Step")
            .withActionOnFailure("CONTINUE")
            .withHadoopJarStep(sparkStepConf);

stepConfigs.add(sparkStep);
req.withSteps(stepConfigs);
AddJobFlowStepsResult result = emr.addJobFlowSteps(req);
...