Невозможно получить доступ к файлу s3 из Flink в AWS EMR - PullRequest
0 голосов
/ 06 января 2020

У нас есть долгосрочный кластер Flink на AWS EMR. Это настроено с ролями по умолчанию (EMR_EC2_DefaultRole). Мы пытаемся запустить задание Flink, но оно не может получить доступ к корзине s3 для чтения файла. Мы создали минимальный основной код метода для его воспроизведения:

String filePath = "s3://<our-bucket>/<the-file>";
logger.info("Path: " + filePath);
Path path = Paths.get(filePath);
logger.info("Successfully got path");
File file = path.toFile();
logger.info("Successfully got creds file");
logger.info("Exists [{}], isFile [{}] ", file.exists(), file.isFile());
String content = FileUtils.readFileToString(file);
logger.info("Content [{}]", content);

Мы запускаем задание Flink через веб-интерфейс Flink. Мы получаем все журналы, кроме журнала Content.

Журнал существует: Exists [false], isFile [false]

Мы также получаем следующую ошибку:

Caused by: java.io.FileNotFoundException: File 's3:/<our-bucket>/<the-file>' does not exist
    at org.apache.commons.io.FileUtils.openInputStream(FileUtils.java:299)
    at org.apache.commons.io.FileUtils.readFileToString(FileUtils.java:1711)
    at org.apache.commons.io.FileUtils.readFileToString(FileUtils.java:1748)
    at com.<our-package>.Main.main(Main.java:39)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
    ... 10 more

Когда мы sh обращаемся к главному экземпляру EC2 и запускаем Следующая команда работает и возвращает содержимое файла:

sudo hdfs dfs -cat s3://<our-bucket>/<the-file>

Пожалуйста, помогите:)

1 Ответ

0 голосов
/ 10 января 2020

Похоже, вы пытаетесь передать путь S3 к org.apache.commons.io.FileUtils.readFileToString(), что, я думаю, не сработает.

Вы можете создать Flink Path из этого пути S3 и использовать его создать входной поток, например

Path = new Path("s3://<our-bucket>/<the-file>");
FileSystem fs = filePath.getFileSystem();
InputStream is = new DataInputStream(fs.open(filePath, readBufferSize));
String s = IOUtils.toString(is, charset);
...