Я работаю над приложением flink на kubernetes (eks), которое потребляет данные из kafka и записывает их в s3.
У нас около 120 миллионов xml сообщений размером 4 ТБ в kafka. Потребление из кафки происходит очень быстро.
При записи в s3 наблюдается высокое противодавление. Мы даже не достигли предела запросов s3 PUT, который составляет около 3500 запросов в секунду c. Я вижу только 300 операций записи в минуту на S3, что очень медленно.
Я использую StreamFileSink для записи на s3 с политикой Rolling как OnCheckpointPolicy.
Использование flink-s3-fs-had oop - *. Jar и s3: // как путь (без использования s3a или s3p)
Кроме этого, у меня нет никакой конфигурации, связанной с s3
StreamingFileSink<Tuple3<String,String, String>> sink = StreamingFileSink
.forRowFormat(new Path(s3://BUCKET),
(Tuple3<String,String, String> element, OutputStream stream) -> {
PrintStream out = new PrintStream(stream);
out.println(element.f2);
})
// Determine component type for each record
.withBucketAssigner(new CustomBucketAssigner())
.withRollingPolicy(OnCheckpointRollingPolicy.build())
.withBucketCheckInterval((TimeUnit.MINUTES.toMillis(1)))
.build();
Есть что-нибудь что мы можем оптимизировать на s3 из streamfilesink или в flink-conf. xml?
Для контрольных точек я тоже использую s3: // вместо s3p или s3a
env.setStateBackend((StateBackend) new RocksDBStateBackend(s3://checkpoint_bucket, true));
env.enableCheckpointing(300000);