Flink StreamingFileSink не записывает данные в AWS S3 - PullRequest
0 голосов
/ 20 января 2020

У меня есть коллекция, которая представляет поток данных и тестирует StreamingFileSink для записи потока в S3. Программа успешно запущена, но в указанном пути S3 нет данных.

    public class S3Sink {

    public static void main(String args[]) throws Exception {
        StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
        see.enableCheckpointing(100);

        List<String> input = new ArrayList<>();
        input.add("test");

        DataStream<String> inputStream = see.fromCollection(input);

        RollingPolicy<Object, String> rollingPolicy = new CustomRollingPolicy();

        StreamingFileSink s3Sink = StreamingFileSink.
                forRowFormat(new Path("<S3 Path>"),
                new SimpleStringEncoder<>("UTF-8"))
                .withRollingPolicy(rollingPolicy)
                .build();


        inputStream.addSink(s3Sink);

        see.execute();
    }
}

Контрольная точка также включена. Есть какие-нибудь мысли о том, почему Sink не работает должным образом?

ОБНОВЛЕНИЕ: На основании ответа Дэвида создан пользовательский источник, который непрерывно генерирует случайную строку, и я ожидаю, что контрольная точка сработает через заданный интервал для записи данных в S3.

public class S3SinkCustom {

    public static void main(String args[]) throws Exception {
        StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
        see.enableCheckpointing(1000);

        DataStream<String> inputStream = see.addSource(new CustomSource());

        RollingPolicy<Object, String> rollingPolicy = new CustomRollingPolicy();

        StreamingFileSink s3Sink = StreamingFileSink.
                forRowFormat(new Path("s3://mybucket/data/"),
                new SimpleStringEncoder<>("UTF-8"))
                .build();


        //inputStream.print();

        inputStream.addSink(s3Sink);

        see.execute();
    }

    static class CustomSource extends RichSourceFunction<String> {

        private volatile boolean running = false;

        final String[] strings = {"ABC", "XYZ", "DEF"};

        @Override
        public void open(Configuration parameters){
            running = true;
        }

        @Override
        public void run(SourceContext sourceContext) throws Exception {
            while (running) {
                Random random = new Random();
                int index = random.nextInt(strings.length);
                sourceContext.collect(strings[index]);
                Thread.sleep(1000);
            }
        }

        @Override
        public void cancel() {
            running = false;
        }
    }

}

Тем не менее, в s3 нет данных, и Flink Process даже не проверяет, является ли корзина S3 действительной или нет, но процесс запущен без проблем.

Обновление:

Ниже приведены подробные сведения о пользовательской скользящей политике:

public class CustomRollingPolicy implements RollingPolicy<Object, String> {

    @Override
    public boolean shouldRollOnCheckpoint(PartFileInfo partFileInfo) throws IOException {
        return partFileInfo.getSize() > 1;
    }

    @Override
    public boolean shouldRollOnEvent(PartFileInfo partFileInfo, Object o) throws IOException {
        return true;
    }

    @Override
    public boolean shouldRollOnProcessingTime(PartFileInfo partFileInfo, long l) throws IOException {
        return true;
    }
}

Ответы [ 2 ]

0 голосов
/ 27 января 2020

Эта проблема решена после настройки flink-conf.yaml с необходимыми свойствами s3a, такими как fs.s3a.access.key, fs.s3a.secret.key.

Нам нужно сообщить Flink о расположение конфигурации.

FileSystem.initialize (GlobalConfiguration.loadConfiguration (""));

С этими изменениями я смог запустить приемник S3 из локального хранилища, и сообщения сохранялись на S3 без каких-либо вопросы.

0 голосов
/ 20 января 2020

Я полагаю, что проблема заключается в том, что написанное вами задание не будет выполняться достаточно долго для фактической проверки, поэтому вывод не будет завершен.

Другая потенциальная проблема заключается в том, что StreamingFileSink работает только с файловой системой S3 на основе Had oop (а не с Presto).

...