Оптимизация производительности записи flink s3 - PullRequest
1 голос
/ 28 мая 2020

Я работаю над приложением flink на kubernetes (eks), которое потребляет данные из kafka и записывает их в s3.

У нас около 120 миллионов xml сообщений размером 4 ТБ в kafka. Потребление из кафки происходит очень быстро.

При записи в s3 наблюдается высокое противодавление. Мы даже не достигли предела запросов s3 PUT, который составляет около 3500 запросов в секунду c. Я вижу только 300 операций записи в минуту на S3, что очень медленно.

Я использую StreamFileSink для записи на s3 с политикой Rolling как OnCheckpointPolicy.

Использование flink-s3-fs-had oop - *. Jar и s3: // как путь (без использования s3a или s3p)

Кроме этого, у меня нет никакой конфигурации, связанной с s3

    StreamingFileSink<Tuple3<String,String, String>> sink = StreamingFileSink
            .forRowFormat(new Path(s3://BUCKET),
                    (Tuple3<String,String, String> element, OutputStream stream) -> {
                        PrintStream out = new PrintStream(stream);
                        out.println(element.f2);
                    })
            // Determine component type for each record
            .withBucketAssigner(new CustomBucketAssigner())
            .withRollingPolicy(OnCheckpointRollingPolicy.build())
            .withBucketCheckInterval((TimeUnit.MINUTES.toMillis(1)))
            .build(); 

Есть что-нибудь что мы можем оптимизировать на s3 из streamfilesink или в flink-conf. xml?

Для контрольных точек я тоже использую s3: // вместо s3p или s3a

env.setStateBackend((StateBackend) new RocksDBStateBackend(s3://checkpoint_bucket, true));
        env.enableCheckpointing(300000);
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...