Camel: AWS -S3: тело файла null, при использовании файла из s3 и переходе в Rest api - PullRequest
0 голосов
/ 02 августа 2020

При выполнении Camel AWS Потребитель S3 для чтения файлов из S3 и передачи их в конечную точку Rest завершается ошибкой, поскольку тело файла становится нулевым. Я использовал аналогичный код для использования файла из sftp, он работал. Но когда я использую конечную точку Aws, это не удается. Есть ли какие-либо другие настройки для объекта обмена s3?

Тело файла становится нулевым при использовании файла из s3 и передаче его в Rest API.

Код:

from("aws-s3://test?amazonS3Client=#amazonS3Client&deleteAfterRead=false&delay=5000&synchronous=true&includeBody=true&autocloseBody=false&exchangePattern=InOut")
                   .convertBodyTo(byte[].class)
                    .log(LoggingLevel.INFO, "consuming", "Consumer Fired!")
                    .log(LoggingLevel.INFO, "Replay Message Sent to file:s3out ${in.header.CamelAwsS3Key}")
                    .filter(simple("${in.header.CamelAwsS3Key} contains 'score_input'"))
                   // .to("file:target/s3out?fileName=${in.header.CamelAwsS3Key}")
                    .process(new Processor() {
                        @Override
                        public void process(Exchange exchange) throws Exception {
                            MultipartEntityBuilder multipartEntityBuilder = MultipartEntityBuilder.create();
                            String filename = exchange.getIn().getHeader(Exchange.FILE_NAME, String.class);
                            File file = exchange.getIn().getBody(File.class);
                            multipartEntityBuilder.addPart("file",
                                    new FileBody(file, ContentType.MULTIPART_FORM_DATA, filename));
                            exchange.getOut().setBody(multipartEntityBuilder.build());
                        }
                    })
                    .to(httpRoute)

Ошибка:

2020-08-02 21:34:57,413 [ws-s3://ds_test] INFO  consuming                      - Consumer Fired!
2020-08-02 21:34:57,414 [ws-s3://ds_test] INFO  route1                         - Replay Message Sent to file:s3out input_0.csv.gz
2020-08-02 21:34:57,415 [ws-s3://ds_test] ERROR DefaultErrorHandler            - Failed delivery for (MessageId: ID-XXXXXX-Mac-49195-1596384290692-0-5 on ExchangeId: ID-XXXXXXX-Mac-49195-1596384290692-0-6). Exhausted after delivery attempt: 1 caught: java.lang.IllegalArgumentException: File may not be null

Message History
---------------------------------------------------------------------------------------------------------------------------------------
RouteId              ProcessorId          Processor                                                                        Elapsed (ms)
[route1            ] [route1            ] [aws-s3://test?amazonS3Client=%23amazonS3Client&autocloseBody=false&delay=50] [       218]
[route1            ] [convertBodyTo1    ] [convertBodyTo[byte[]]                                                         ] [       216]
[route1            ] [log1              ] [log                                                                           ] [         0]
[route1            ] [log2              ] [log                                                                           ] [         1]
[route1            ] [filter1           ] [filter[simple{Simple: ${in.header.CamelAwsS3Key} contains 'score_input'}]     ] [         1]
[route1            ] [process1          ] [Processor@0x7aa3628c                                                          ] [         0]

Stacktrace
---------------------------------------------------------------------------------------------------------------------------------------
java.lang.IllegalArgumentException: File may not be null
        at org.apache.http.util.Args.notNull(Args.java:54) ~[httpcore-4.4.4.jar:4.4.4]
        at org.apache.http.entity.mime.content.FileBody.<init>(FileBody.java:97) ~[httpmime-4.5.1.jar:4.5.1]
        at org.apache.camel.example.cdi.aws.s3.Application$AwsS3Route$1.process(Application.java:101) ~[classes/:?]
        at org.apache.camel.processor.DelegateSyncProcessor.process(DelegateSyncProcessor.java:63) ~[camel-core-2.18.2.jar:2.18.2]
        at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77) ~[camel-core-2.18.2.jar:2.18.2]
        at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:542) [camel-core-2.18.2.jar:2.18.2]
        at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:197) [camel-core-2.18.2.jar:2.18.2]
        at org.apache.camel.processor.Pipeline.process(Pipeline.java:120) [camel-core-2.18.2.jar:2.18.2]
        at org.apache.camel.processor.Pipeline.process(Pipeline.java:83) [camel-core-2.18.2.jar:2.18.2]
        at org.apache.camel.processor.FilterProcessor.process(FilterProcessor.java:57) [camel-core-2.18.2.jar:2.18.2]
        at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77) [camel-core-2.18.2.jar:2.18.2]
        at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:542) [camel-core-2.18.2.jar:2.18.2]
        at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:197) [camel-core-2.18.2.jar:2.18.2]
        at org.apache.camel.processor.Pipeline.process(Pipeline.java:120) [camel-core-2.18.2.jar:2.18.2]
        at org.apache.camel.processor.Pipeline.process(Pipeline.java:83) [camel-core-2.18.2.jar:2.18.2]
        at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:197) [camel-core-2.18.2.jar:2.18.2]
        at org.apache.camel.component.aws.s3.S3Consumer.processBatch(S3Consumer.java:157) [camel-aws-2.18.2.jar:2.18.2]
        at org.apache.camel.component.aws.s3.S3Consumer.poll(S3Consumer.java:101) [camel-aws-2.18.2.jar:2.18.2]
        at org.apache.camel.impl.ScheduledPollConsumer.doRun(ScheduledPollConsumer.java:175) [camel-core-2.18.2.jar:2.18.2]
        at org.apache.camel.impl.ScheduledPollConsumer.run(ScheduledPollConsumer.java:102) [camel-core-2.18.2.jar:2.18.2]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) [?:?]
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) [?:?]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
        at java.lang.Thread.run(Thread.java:830) [?:?]
2020-08-02 21:34:57,419 [ws-s3://ds_test] WARN  S3Consumer                     - Exchange failed, so rolling back message status: Exchange[ID-XXXXXX-Mac-49195-1596384290692-0-6]
java.lang.IllegalArgumentException: File may not be null
        at org.apache.http.util.Args.notNull(Args.java:54) ~[httpcore-4.4.4.jar:4.4.4]
        at org.apache.http.entity.mime.content.FileBody.<init>(FileBody.java:97) ~[httpmime-4.5.1.jar:4.5.1]
        at org.apache.camel.example.cdi.aws.s3.Application$AwsS3Route$1.process(Application.java:101) ~[classes/:?]
        at org.apache.camel.processor.DelegateSyncProcessor.process(DelegateSyncProcessor.java:63) ~[camel-core-2.18.2.jar:2.18.2]
        at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77) ~[camel-core-2.18.2.jar:2.18.2]
        at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:542) ~[camel-core-2.18.2.jar:2.18.2]
        at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:197) ~[camel-core-2.18.2.jar:2.18.2]
        at org.apache.camel.processor.Pipeline.process(Pipeline.java:120) ~[camel-core-2.18.2.jar:2.18.2]
        at org.apache.camel.processor.Pipeline.process(Pipeline.java:83) ~[camel-core-2.18.2.jar:2.18.2]
        at org.apache.camel.processor.FilterProcessor.process(FilterProcessor.java:57) ~[camel-core-2.18.2.jar:2.18.2]
        at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77) ~[camel-core-2.18.2.jar:2.18.2]
        at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:542) ~[camel-core-2.18.2.jar:2.18.2]
        at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:197) ~[camel-core-2.18.2.jar:2.18.2]
        at org.apache.camel.processor.Pipeline.process(Pipeline.java:120) ~[camel-core-2.18.2.jar:2.18.2]
        at org.apache.camel.processor.Pipeline.process(Pipeline.java:83) [camel-core-2.18.2.jar:2.18.2]
        at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:197) [camel-core-2.18.2.jar:2.18.2]
        at org.apache.camel.component.aws.s3.S3Consumer.processBatch(S3Consumer.java:157) [camel-aws-2.18.2.jar:2.18.2]
        at org.apache.camel.component.aws.s3.S3Consumer.poll(S3Consumer.java:101) [camel-aws-2.18.2.jar:2.18.2]
        at org.apache.camel.impl.ScheduledPollConsumer.doRun(ScheduledPollConsumer.java:175) [camel-core-2.18.2.jar:2.18.2]
        at org.apache.camel.impl.ScheduledPollConsumer.run(ScheduledPollConsumer.java:102) [camel-core-2.18.2.jar:2.18.2]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) [?:?]
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) [?:?]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
        at java.lang.Thread.run(Thread.java:830) [?:?]

1 Ответ

0 голосов
/ 07 августа 2020
from("timer:CSVReader?period=10s")
          
                    
                    .pollEnrich("aws-s3://test?amazonS3Client=#amazonS3Client&deleteAfterRead=false&delay=5000&synchronous=true&includeBody=true&autocloseBody=false&fileName=")
                   
                         
                            .setHeader("CamelAwsS3ContentType", constant("text/csv"))
                    .log(LoggingLevel.INFO, "RESPONSE Headers ${headers}").end()
                    .log(LoggingLevel.INFO, "consuming", "Consumer Fired!")
                  
                    .log(LoggingLevel.INFO, "Replay Message Sent to file:s3out ${in.header.CamelAwsS3Key}")
                    .filter(simple("${in.header.CamelAwsS3Key} contains 'score_input'"))
                 
                            .unmarshal().gzip()
                            .process(new Processor() {
                                @Override
                                public void process(Exchange exchange) throws Exception {
                                    MultipartEntityBuilder multipartEntityBuilder = MultipartEntityBuilder.create();
                                    String filename = exchange.getIn().getHeader(S3Constants.KEY, String.class);
                                    String body = exchange.getIn().getBody(String.class);
                                    ContentBody cd = new InputStreamBody(new ByteArrayInputStream(body.getBytes()),ContentType.MULTIPART_FORM_DATA, "temp.csv");
                                    multipartEntityBuilder.addPart("file",  cd);
                                    exchange.getOut().setBody(multipartEntityBuilder.build());
                                }
                            })
                    .to(httpRoute
...