Apache Camel: загрузка нескольких файлов одновременно с использованием компонента SFTP - PullRequest
0 голосов
/ 18 марта 2020

на sftp У меня есть несколько файлов со следующими именами xyz:

40_20200313_0cd6963f-bf5b-4eb0-b310-255a23ed778e_p.dat
123_20200313_0cd6963f-bf5b-4eb0-b310-255a23ed778e_p.dat
etc.

Я хочу, чтобы верблюд загрузил все файлы одновременно, поскольку в настоящее время он загружает файл один за другим.

Ниже следует верблюжий маршрут и запрос:

    private static String regex() {
        return "(22|23|24|25|26|28|29|32|35|40|41|46|52|70|85|88|123)_(?:.*)_p.dat";
    }

    private static String sftpComponent() {
        return "sftp://transit.ergogroup.no/Eyeshare/From_Eyeshare_Test"
                + "?username=Eyeshare_test"
                + "&password=epw3ePOugG" // Stored on wildfly server
                + "&download=true" //Shall be read chunk by chunk to avoid heap space issues. Earlier download=true was used: Harpreet
                + "&useList=true"
                + "&stepwise=false"
                + "&disconnect=true"
                + "&passiveMode=true"
                + "&reconnectDelay=10000"
//              + "&bridgeErrorHandler=true"
                + "&delay=300000"
                //+ "&fileName=" + sftpFileName
//              + "&include=kiki\\.txt"
//              + "&include=40_*_p\\.dat"sss
                + "&include="+regex()
                + "&preMove=$simple{file:onlyname}.$simple{date:now:yyyy-MM-dd'T'hh-mm-ss}.processing"
                + "&move=$simple{file:onlyname.noext}.$simple{date:now:yyyy-MM-dd'T'hh-mm-ss}.success"
                + "&moveFailed=$simple{file:onlyname.noext}.$simple{date:now:yyyy-MM-dd'T'hh-mm-ss}.failed";
//              + "&idempotentRepository=#infinispan"
//              + "&readLockRemoveOnCommit=true";
    }

   from(sftpComponent()).log("CHU").to(archiveReceivedFile())

Код выглядит нормально, но вывод - нет. Любой, пожалуйста, предложите

1 Ответ

0 голосов
/ 20 марта 2020

Вот пример агрегатора :

from("file:///somePath/consume/?maxMessagesPerPoll=2&delay=5000")
            .aggregate(constant(true), new ZipAggregationStrategy()).completion(exchange -> exchange.getProperty("CamelBatchComplete", Boolean.class))
            .to("file:///somePath/produce/")

Здесь maxMessagesPerPoll определяет, сколько файлов будет заархивировано. Но если их количество в папке меньше значения maxMessagesPerPoll, он будет ожидать пропущенных файлов для полного архива. Вот пример ZipAggregationStrategy:

private static class ZipAggregationStrategy implements AggregationStrategy {
    private ZipOutputStream zipOutputStream;
    private ByteArrayOutputStream out;
    @Override
    public Exchange aggregate(final Exchange oldExchange, final Exchange newExchange) {
        try {
            if (oldExchange == null) {
                out = new ByteArrayOutputStream();
                zipOutputStream = new ZipOutputStream(out);
            }
            createEntry(newExchange);
            return newExchange;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
    private void createEntry(final Exchange exchange) throws Exception {
        final ZipEntry zipEntry = new ZipEntry(exchange.getIn().getHeader(Exchange.FILE_NAME, String.class));
        zipOutputStream.putNextEntry(zipEntry);
        byte[] bytes = new byte[1024];
        int length;
        try (InputStream body = exchange.getIn().getBody(InputStream.class)) {
            while ((length = body.read(bytes)) >= 0) {
                zipOutputStream.write(bytes, 0, length);
            }
        }
    }
    @Override
    public void onCompletion(final Exchange exchange) {
        try {
            zipOutputStream.close();
            exchange.getIn().setBody(new ByteArrayInputStream(out.toByteArray()));
            exchange.getIn().setHeader(Exchange.FILE_NAME, "someArchive.zip");
        }catch (Exception e){
            throw new RuntimeException(e);
        } finally {
            try {
                out.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

Это пример в памяти. Вы можете улучшить это, например, с помощью временного файла. И вы всегда можете создать свой собственный предикат завершения на основе вашей логики c.

UPD: я думаю, что ссылка на документацию временно недоступна

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...