Apache Flink с S3 / EMRFS - PullRequest
       29

Apache Flink с S3 / EMRFS

0 голосов
/ 13 сентября 2018

Всем доброго времени суток,

Я запускаю Flink на AWS EMR с источником файла S3 и получаю следующую ошибку: " org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.SdkClientException: не удалось очистить XML-документ, предназначенный для класса обработчика ..."

Кажется, что эта ошибка возникает, когда слишком много файлов для чтения (более 100K файлов)

Согласно этой странице https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/aws.html, при работе в EMR, нет необходимости выполнять какие-либо настройки для файла S3.

Полная трассировка стека вставлена ​​ниже.Я думаю, что эта ошибка на стороне EMR / AWS SDK, а не с Flink.Может кто-нибудь, пожалуйста, помогите?

Спасибо и привет, Averell

org.apache.flink.client.program.ProgramInvocationException: Job failed. (JobID: d856c36a05b23981c8b1bb8cbc78182a)
    at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:267)
    at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:487)
    at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
    at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:664)
    at com.nbnco.copper.csa.analysis.chronos.jobs.sdc.StreamingSdc2$.main(StreamingSdc2.scala:300)
    at com.nbnco.copper.csa.analysis.chronos.jobs.sdc.StreamingSdc2.main(StreamingSdc2.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
    at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
    at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:804)
    at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:280)
    at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:215)
    at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1044)
    at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1120)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556)
    at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
    at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1120)
Caused by: org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3a.AWSClientIOException: listObjects() on s3://assn-huyen/PROD_SDC/hist/20180409/1000: org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.SdkClientException: Failed to sanitize XML document destined for handler class org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.services.s3.model.transform.XmlResponsesSaxParser$ListBucketHandler: Failed to sanitize XML document destined for handler class org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.services.s3.model.transform.XmlResponsesSaxParser$ListBucketHandler
    at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:128)
    at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:101)
    at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3a.Listing$ObjectListingIterator.next(Listing.java:441)
    at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3a.Listing$FileStatusListingIterator.requestNextBatch(Listing.java:265)
    at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3a.Listing$FileStatusListingIterator.hasNext(Listing.java:241)
    at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3a.S3AFileSystem.innerListStatus(S3AFileSystem.java:1409)
    at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3a.S3AFileSystem.listStatus(S3AFileSystem.java:1369)
    at org.apache.flink.fs.s3hadoop.shaded.org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.listStatus(HadoopFileSystem.java:157)
    at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.listStatus(SafetyNetWrapperFileSystem.java:97)
    at org.apache.flink.api.common.io.FileInputFormat.addFilesInDir(FileInputFormat.java:707)
    at org.apache.flink.api.common.io.FileInputFormat.addFilesInDir(FileInputFormat.java:710)
    at org.apache.flink.api.common.io.FileInputFormat.addFilesInDir(FileInputFormat.java:710)
    at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:591)
    at org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.getInputSplitsSortedByModTime(ContinuousFileMonitoringFunction.java:366)
    at org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.monitorDirAndForwardSplits(ContinuousFileMonitoringFunction.java:326)
    at org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.run(ContinuousFileMonitoringFunction.java:290)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56)
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.SdkClientException: Failed to sanitize XML document destined for handler class org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.services.s3.model.transform.XmlResponsesSaxParser$ListBucketHandler
    at org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.services.s3.model.transform.XmlResponsesSaxParser.sanitizeXmlDocument(XmlResponsesSaxParser.java:214)
    at org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.services.s3.model.transform.XmlResponsesSaxParser.parseListBucketObjectsResponse(XmlResponsesSaxParser.java:298)
    at org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.services.s3.model.transform.Unmarshallers$ListObjectsUnmarshaller.unmarshall(Unmarshallers.java:70)
    at org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.services.s3.model.transform.Unmarshallers$ListObjectsUnmarshaller.unmarshall(Unmarshallers.java:59)
    at org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.services.s3.internal.S3XmlResponseHandler.handle(S3XmlResponseHandler.java:62)
    at org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.services.s3.internal.S3XmlResponseHandler.handle(S3XmlResponseHandler.java:31)
    at org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.http.response.AwsResponseHandlerAdapter.handle(AwsResponseHandlerAdapter.java:70)
    at org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleResponse(AmazonHttpClient.java:1494)
    at org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1217)
    at org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1030)
    at org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:742)
    at org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:716)
    at org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
    at org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
    at org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
    at org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
    at org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4194)
    at org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4141)
    at org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4135)
    at org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.services.s3.AmazonS3Client.listObjects(AmazonS3Client.java:838)
    at org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.services.s3.AmazonS3Client.listNextBatchOfObjects(AmazonS3Client.java:905)
    at org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.services.s3.AmazonS3Client.listNextBatchOfObjects(AmazonS3Client.java:883)
    at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3a.S3AFileSystem.continueListObjects(S3AFileSystem.java:929)
    at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3a.Listing$ObjectListingIterator.next(Listing.java:437)
    ... 19 more
Caused by: java.lang.OutOfMemoryError: Java heap space
    at java.util.Arrays.copyOfRange(Arrays.java:3664)
    at java.lang.String.<init>(String.java:207)
    at java.lang.StringBuilder.toString(StringBuilder.java:407)
    at org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.services.s3.model.transform.XmlResponsesSaxParser.sanitizeXmlDocument(XmlResponsesSaxParser.java:197)
    at org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.services.s3.model.transform.XmlResponsesSaxParser.parseListBucketObjectsResponse(XmlResponsesSaxParser.java:298)
    at org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.services.s3.model.transform.Unmarshallers$ListObjectsUnmarshaller.unmarshall(Unmarshallers.java:70)
    at org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.services.s3.model.transform.Unmarshallers$ListObjectsUnmarshaller.unmarshall(Unmarshallers.java:59)
    at org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.services.s3.internal.S3XmlResponseHandler.handle(S3XmlResponseHandler.java:62)
    at org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.services.s3.internal.S3XmlResponseHandler.handle(S3XmlResponseHandler.java:31)
    at org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.http.response.AwsResponseHandlerAdapter.handle(AwsResponseHandlerAdapter.java:70)
    at org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleResponse(AmazonHttpClient.java:1494)
    at org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1217)
    at org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1030)
    at org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:742)
    at org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:716)
    at org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
    at org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
    at org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
    at org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
    at org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4194)
    at org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4141)
    at org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4135)
    at org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.services.s3.AmazonS3Client.listObjects(AmazonS3Client.java:838)
    at org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.services.s3.AmazonS3Client.listNextBatchOfObjects(AmazonS3Client.java:905)
    at org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.services.s3.AmazonS3Client.listNextBatchOfObjects(AmazonS3Client.java:883)
    at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3a.S3AFileSystem.continueListObjects(S3AFileSystem.java:929)
    at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3a.Listing$ObjectListingIterator.next(Listing.java:437)
    at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3a.Listing$FileStatusListingIterator.requestNextBatch(Listing.java:265)
    at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3a.Listing$FileStatusListingIterator.hasNext(Listing.java:241)
    at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3a.S3AFileSystem.innerListStatus(S3AFileSystem.java:1409)
    at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3a.S3AFileSystem.listStatus(S3AFileSystem.java:1369)
    at org.apache.flink.fs.s3hadoop.shaded.org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.listStatus(HadoopFileSystem.java:157)
...