Я настроил свой кластер EMR для второй очереди Hadoop. Очередь «по умолчанию» и та, которую я назвал «золото».
Очередь присутствует и работает, когда мой кластер EMR запускается, как я вижу в веб-интерфейсе Hadoop для Yarn.
Мой код Java создает конвейерное задание (в данном случае HiveActivity), и я добавляю список объектов Field в объект Pipeline. Эти поля являются настройками для конвейера.
Одно из таких полей (необязательно в соответствии с документами на https://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-object-hiveactivity.html) - это 'hadoopQueue'. Когда я не устанавливаю это поле, по умолчанию используется очередь 'default'. Задание завершается, и все хорошо .
Когда я устанавливаю поле с этим кодом
//hadoopQueue
field = new Field();
field.setKey("hadoopQueue");
field.setStringValue("gold");
fields.add(field);
Задание не завершено, и в моих журналах конвейера данных появляется следующая ошибка
Exception in thread "main" java.lang.RuntimeException: Local file does not exist.
at com.amazon.elasticmapreduce.scriptrunner.ScriptRunner.fetchFile(ScriptRunner.java:30)
at com.amazon.elasticmapreduce.scriptrunner.ScriptRunner.main(ScriptRunner.java:56)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.util.RunJar.run(RunJar.java:239)
at org.apache.hadoop.util.RunJar.main(RunJar.java:153)
Я также получаю эту ошибку, если я явно устанавливаю для очереди значение «по умолчанию», например
//hadoopQueue
field = new Field();
field.setKey("hadoopQueue");
field.setStringValue("default");
fields.add(field);
Кажется, что это работает, только если я не установил эту опцию, но не использовал ее, это означает, что я не могу указать другую очередь.
Кто-нибудь смог успешно использовать опцию 'hadoopQueue' таким образом?
public static String scheduleDataPipelineActivity(DataPipeline dpl, String pipelineId, String script, String stepName,
Map<String,String> params) {
PutPipelineDefinitionRequest putDefReq = new PutPipelineDefinitionRequest();
putDefReq.setPipelineId(pipelineId);
List<PipelineObject> objects = new ArrayList<>();
List<Field> fields = new ArrayList<>();
Field field = new Field();
field.setKey("failureAndRerunMode");
field.setStringValue("CASCADE");
fields.add(field);
field = new Field();
field.setKey("resourceRole");
field.setStringValue("DataPipelineDefaultResourceRole");
fields.add(field);
field = new Field();
field.setKey("role");
field.setStringValue("DataPipelineDefaultRole");
fields.add(field);
field = new Field();
field.setKey("pipelineLogUri");
field.setStringValue("s3://" + getBucketName() + "/logs");
fields.add(field);
field = new Field();
field.setKey("scheduleType");
field.setStringValue("ONDEMAND");
fields.add(field);
if ((params != null) && (params.size() > 0)) {
for (Map.Entry<String,String> entry : params.entrySet()) {
field = new Field();
field.setKey("scriptVariable");
field.setStringValue(entry.getKey() + "=" + entry.getValue());
fields.add(field);
}
}
PipelineObject po = new PipelineObject().withName("Default").withId("Default");
po.setFields(fields);
objects.add(po);
fields = new ArrayList<>();
field = new Field();
field.setKey("stage");
field.setStringValue("false");
fields.add(field);
field = new Field();
if (script.startsWith("s3://")) {
field.setKey("scriptUri");
}
else if (script.length() > 2048) {
field.setKey("scriptUri");
Writer writer = null;
try {
String hiveQL = UUID.randomUUID().toString() + ".q";
File localFile = new File(hiveQL);
writer = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(localFile), "UTF-8"));
writer.write(script);
writer.flush();
writer.close();
writer = null;
String s3Key = "working/" + hiveQL;
script = "s3://" + getBucketName() + "/" + s3Key;
AmazonS3 s3 = getS3Client(null);
s3.putObject(getBucketName(), s3Key, localFile);
if (!localFile.delete()) {
LOGGER.error("Unable to delete temporary file: " + hiveQL);
}
}
catch (IOException e) {
LOGGER.error(e.getLocalizedMessage(), e);
throw new RuntimeException(e.getLocalizedMessage(), e);
}
finally {
if (writer != null) {
try {
writer.close();
}
catch(IOException e) {
LOGGER.error(e.getLocalizedMessage(), e);
}
}
}
}
else {
field.setKey("hiveScript");
}
field.setStringValue(script);
fields.add(field);
field = new Field();
field.setKey("workerGroup");
field.setStringValue(EMR_INSTANCE);
fields.add(field);
hadoopQueue
field = new Field();
field.setKey("hadoopQueue");
field.setStringValue("gold");
fields.add(field);
field = new Field();
field.setKey("type");
field.setStringValue("HiveActivity");
fields.add(field);
String hiveId = UUID.randomUUID().toString();
po = new PipelineObject().withName(stepName).withId(hiveId);
po.setFields(fields);
objects.add(po);
putDefReq.setPipelineObjects(objects);
PutPipelineDefinitionResult putPipelineResult = dpl.putPipelineDefinition(putDefReq);
List<ValidationError> errors = putPipelineResult.getValidationErrors();
int errorCount = 0;
if ((errors != null) && (errors.size() > 0)) {
for (ValidationError error : errors) {
List<String> errorStrs = error.getErrors();
for (String errorStr : errorStrs) {
LOGGER.error(errorStr);
errorCount++;
}
}
}
List<ValidationWarning> warnings = putPipelineResult.getValidationWarnings();
if ((warnings != null) && (warnings.size() > 0)) {
for (ValidationWarning warning : warnings) {
List<String> warningStrs = warning.getWarnings();
for (String warningStr : warningStrs) {
LOGGER.warn(warningStr);
}
}
}
if (errorCount > 0) {
LOGGER.fatal("BAD STUFF HAPPENED!!!!");
throw new DataPipelineValidationException("Validation errors detected for hive activity (check log file for details): " +
"step=" + stepName + "\terror count=" + Integer.toString(errorCount) + "\t" + pipelineId);
}
ActivatePipelineRequest activateReq = new ActivatePipelineRequest();
activateReq.setPipelineId(pipelineId);
dpl.activatePipeline(activateReq);
try {
Thread.sleep(2500L);
}
catch (InterruptedException e) {
LOGGER.error(e.getLocalizedMessage(), e);
}
return hiveId;