Flink - AWSS3IOException в AWS EMR, вызванная BucketingSink с S3A - PullRequest
0 голосов
/ 05 декабря 2018

У меня есть приложение Flink с высоким параллелизмом (400), работающее в AWS EMR.Он получает Kafka и опускается на S3, используя BucketingSink (используя RocksDb для проверки контрольных точек).Место назначения определяется с помощью префикса "s3a: //".Задание Flink - это потоковое приложение, которое работает непрерывно.В любой момент времени все работники вместе могут сгенерировать / записать до 400 файлов (из-за параллелизма 400).Через несколько дней один из рабочих выйдет из строя за исключением:

org.apache.hadoop.fs.s3a.AWSS3IOException: copyFile(bucket/2018-09-01/05/_file-10-1.gz.in-progress, bucket/2018-09-01/05/_file-10-1.gz.pending): com.amazonaws.services.s3.model.AmazonS3Exception: We encountered an internal error. Pelase try again. (Service: Amazon S3; Status Code: 200 InternalError; Request ID: xxxxxxxxxx; S3 Extended Request ID: yyyyyyyyyyyyyyy
at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java: 178)
at org.apache.hadoop.fs.s3a.S3AFileSystem.copyFile(S3AFileSystem.java: 1803)
at org.apache.hadoop.fs.s3a.S3AFileSystem.innerRename(S3AFileSystem.java:776)
at org.apache.hadoop.fs.s3a.S3AFileSystem.rename(S3AFileSystem.java:662)
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.closeCurrentPartFile(BucketingSink.java:575)
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.openNewPartFile(BucketingSink.java:514)
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.invoke(BucketingSink.java:446)

Похоже, что это происходит случайно, когда BucketingSink создает новый файл детали.Странно то, что это происходит случайным образом, и когда это происходит, это происходит с одним из работников, работающих параллельно, (не со всеми).Кроме того, когда это происходит, задание Flink переходит в состояние FAILING, но задание Flink не перезапускается и не возобновляет / восстанавливает с последней успешной контрольной точки.Какова причина этого и как это должно быть решено?Кроме того, как можно настроить задание на перезапуск / восстановление с последней успешной контрольной точки, а не оставаться в состоянии ОТКАЗ?

1 Ответ

0 голосов
/ 03 января 2019

Я думаю, что это известное поведение с приемником в контейнере и S3, и предлагаемое решение - использовать блестящую новую StreamingFileSink в Flink 1.7.0.

По сути, приемник с накоплением ожидает, что запись и переименование будут происходить немедленно, как в реальной файловой системе, но это не является хорошим предположением для хранилищ объектов, таких как S3, поэтому приемник с накоплением заканчивается в условиях гонки.которые вызывают периодические проблемы.Вот билет JIRA, который как бы описывает проблему, и связанные с ней заявки еще больше подчеркивают ее. JIRA FLINK-9752

...