Не удается отправить задание AWS Datapipe через java sdk в очередь не по умолчанию - PullRequest
0 голосов
/ 15 марта 2019

Я настроил свой кластер EMR для второй очереди Hadoop. Очередь «по умолчанию» и та, которую я назвал «золото». Очередь присутствует и работает, когда мой кластер EMR запускается, как я вижу в веб-интерфейсе Hadoop для Yarn.

Мой код Java создает конвейерное задание (в данном случае HiveActivity), и я добавляю список объектов Field в объект Pipeline. Эти поля являются настройками для конвейера.

Одно из таких полей (необязательно в соответствии с документами на https://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-object-hiveactivity.html) - это 'hadoopQueue'. Когда я не устанавливаю это поле, по умолчанию используется очередь 'default'. Задание завершается, и все хорошо .

Когда я устанавливаю поле с этим кодом

//hadoopQueue
field = new Field();
field.setKey("hadoopQueue");
field.setStringValue("gold");
fields.add(field);

Задание не завершено, и в моих журналах конвейера данных появляется следующая ошибка

Exception in thread "main" java.lang.RuntimeException: Local file does not exist.
at com.amazon.elasticmapreduce.scriptrunner.ScriptRunner.fetchFile(ScriptRunner.java:30)
at com.amazon.elasticmapreduce.scriptrunner.ScriptRunner.main(ScriptRunner.java:56)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.util.RunJar.run(RunJar.java:239)
at org.apache.hadoop.util.RunJar.main(RunJar.java:153)

Я также получаю эту ошибку, если я явно устанавливаю для очереди значение «по умолчанию», например

//hadoopQueue
  field = new Field();
  field.setKey("hadoopQueue");
  field.setStringValue("default");
  fields.add(field);

Кажется, что это работает, только если я не установил эту опцию, но не использовал ее, это означает, что я не могу указать другую очередь.

Кто-нибудь смог успешно использовать опцию 'hadoopQueue' таким образом?

public static String scheduleDataPipelineActivity(DataPipeline dpl, String pipelineId, String script, String stepName,
  Map<String,String> params) {

PutPipelineDefinitionRequest putDefReq = new PutPipelineDefinitionRequest();
putDefReq.setPipelineId(pipelineId);
List<PipelineObject> objects = new ArrayList<>();

List<Field> fields = new ArrayList<>();

Field field = new Field();
field.setKey("failureAndRerunMode");
field.setStringValue("CASCADE");
fields.add(field);

field = new Field();
field.setKey("resourceRole");
field.setStringValue("DataPipelineDefaultResourceRole");
fields.add(field);

field = new Field();
field.setKey("role");
field.setStringValue("DataPipelineDefaultRole");
fields.add(field);

field = new Field();
field.setKey("pipelineLogUri");
field.setStringValue("s3://" + getBucketName() + "/logs");
fields.add(field);

field = new Field();
field.setKey("scheduleType");
field.setStringValue("ONDEMAND");
fields.add(field);

if ((params != null) && (params.size() > 0)) {
  for (Map.Entry<String,String> entry : params.entrySet()) {
    field = new Field();
    field.setKey("scriptVariable");
    field.setStringValue(entry.getKey() + "=" + entry.getValue());
    fields.add(field);
  }
}

PipelineObject po = new PipelineObject().withName("Default").withId("Default");
po.setFields(fields);
objects.add(po);

fields = new ArrayList<>();
field = new Field();
field.setKey("stage");
field.setStringValue("false");
fields.add(field);

field = new Field();
if (script.startsWith("s3://")) {
  field.setKey("scriptUri");
}
else if (script.length() > 2048) {

  field.setKey("scriptUri");
  Writer writer = null;
  try {
    String hiveQL = UUID.randomUUID().toString() + ".q";
    File localFile = new File(hiveQL);
    writer = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(localFile), "UTF-8"));
    writer.write(script);
    writer.flush();
    writer.close();
    writer = null;

    String s3Key = "working/" + hiveQL;
    script = "s3://" + getBucketName() + "/" + s3Key;

    AmazonS3 s3 = getS3Client(null);
    s3.putObject(getBucketName(), s3Key, localFile);
    if (!localFile.delete()) {
      LOGGER.error("Unable to delete temporary file: " + hiveQL);
    }
  }
  catch (IOException e) {
    LOGGER.error(e.getLocalizedMessage(), e);
    throw new RuntimeException(e.getLocalizedMessage(), e);
  }
  finally {
    if (writer != null) {
      try {
        writer.close();
      }
      catch(IOException e) {
        LOGGER.error(e.getLocalizedMessage(), e);
      }
    }
  }
}
else {
  field.setKey("hiveScript");
}
field.setStringValue(script);
fields.add(field);

field = new Field();
field.setKey("workerGroup");
field.setStringValue(EMR_INSTANCE);
fields.add(field);

hadoopQueue
field = new Field();
field.setKey("hadoopQueue");
field.setStringValue("gold");
fields.add(field);

field = new Field();
field.setKey("type");
field.setStringValue("HiveActivity");
fields.add(field);

String hiveId = UUID.randomUUID().toString();
po = new PipelineObject().withName(stepName).withId(hiveId);
po.setFields(fields);
objects.add(po);

putDefReq.setPipelineObjects(objects);
PutPipelineDefinitionResult putPipelineResult = dpl.putPipelineDefinition(putDefReq);
List<ValidationError> errors = putPipelineResult.getValidationErrors();
int errorCount = 0;
if ((errors != null) && (errors.size() > 0)) {
  for (ValidationError error : errors) {
    List<String> errorStrs = error.getErrors();
    for (String errorStr : errorStrs) {
      LOGGER.error(errorStr);
      errorCount++;
    }
  }
}

List<ValidationWarning> warnings = putPipelineResult.getValidationWarnings();
if ((warnings != null) && (warnings.size() > 0)) {
  for (ValidationWarning warning : warnings) {
    List<String> warningStrs = warning.getWarnings();
    for (String warningStr : warningStrs) {
      LOGGER.warn(warningStr);
    }
  }
}

if (errorCount > 0) {
  LOGGER.fatal("BAD STUFF HAPPENED!!!!");
  throw new DataPipelineValidationException("Validation errors detected for hive activity (check log file for details): " +
      "step=" + stepName + "\terror count=" + Integer.toString(errorCount) + "\t" + pipelineId);
}

ActivatePipelineRequest activateReq = new ActivatePipelineRequest();
activateReq.setPipelineId(pipelineId);
dpl.activatePipeline(activateReq);

try {
  Thread.sleep(2500L);
}
catch (InterruptedException e) {
  LOGGER.error(e.getLocalizedMessage(), e);
}

return hiveId;
...