Можно ли зарегистрировать файл в распределенном кэше во время FlatMapFunction во Flink? - PullRequest
0 голосов
/ 03 октября 2019

У меня есть FlatMapFunction, которая перечисляет элементы в S3. Я хочу зарегистрировать каждый элемент в распределенном файловом кеше.

Это вообще возможно?

, т. Е. В моей работе:

final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

...

... = myDataSet.flatMap(new S3Lister(env));

и в S3Listerfile:

...
String id = os.getKey().substring(os.getKey().lastIndexOf('/') + 1);
env.registerCachedFile("s3://" + bucket + os.getKey(), id);
...

, а затем доступ к нему из распределенного кэша в другой пользовательской функции coGroup.

Может ли это работать? Вам даже разрешено передавать ExecutionEnvironment таким образом?

Обновление :

Если нет, каков наилучший способ получить всю корзину S3 в распределенный файловый кешдля использования в работе Flink?

1 Ответ

0 голосов
/ 03 октября 2019

По сути, метод registerCachedFiles помогает загружать файлы при отправке задания. Поэтому невозможно вызвать его в развернутой программе.

Но из вашего описания, почему бы не прочитать файлы S3 напрямую?

...