У нас есть долгосрочный кластер Flink на AWS EMR. Это настроено с ролями по умолчанию (EMR_EC2_DefaultRole). Мы пытаемся запустить задание Flink, но оно не может получить доступ к корзине s3 для чтения файла. Мы создали минимальный основной код метода для его воспроизведения:
String filePath = "s3://<our-bucket>/<the-file>";
logger.info("Path: " + filePath);
Path path = Paths.get(filePath);
logger.info("Successfully got path");
File file = path.toFile();
logger.info("Successfully got creds file");
logger.info("Exists [{}], isFile [{}] ", file.exists(), file.isFile());
String content = FileUtils.readFileToString(file);
logger.info("Content [{}]", content);
Мы запускаем задание Flink через веб-интерфейс Flink. Мы получаем все журналы, кроме журнала Content
.
Журнал существует: Exists [false], isFile [false]
Мы также получаем следующую ошибку:
Caused by: java.io.FileNotFoundException: File 's3:/<our-bucket>/<the-file>' does not exist
at org.apache.commons.io.FileUtils.openInputStream(FileUtils.java:299)
at org.apache.commons.io.FileUtils.readFileToString(FileUtils.java:1711)
at org.apache.commons.io.FileUtils.readFileToString(FileUtils.java:1748)
at com.<our-package>.Main.main(Main.java:39)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
... 10 more
Когда мы sh обращаемся к главному экземпляру EC2 и запускаем Следующая команда работает и возвращает содержимое файла:
sudo hdfs dfs -cat s3://<our-bucket>/<the-file>
Пожалуйста, помогите:)