Реализация пользовательской стратегии процесса с помощью Apache Camel File Component - PullRequest
1 голос
/ 10 марта 2011

Проблемный фон

В настоящее время я работаю над приложением ETL на основе верблюдов, которое обрабатывает группы файлов по мере их появления в устаревшем каталоге. Файлы должны обрабатываться вместе как группа, определяемая началом имени файла. Файлы могут быть обработаны только после того, как готовый файл (".flag") будет записан в каталог. Я знаю, что в компоненте верблюжьего файла есть опция готового файла, но это позволяет вам получать только файлы с тем же именем, что и готовый файл. Приложение должно работать непрерывно и начать опрос каталога следующего дня, когда дата катится.

Пример структуры каталогов:

 /process-directory
      /03-09-2011
      /03-10-2011
           /GROUPNAME_ID1_staticfilename.xml
           /GROUPNAME_staticfilename2.xml
           /GROUPNAME.flag
           /GROUPNAME2_ID1_staticfilename.xml
           /GROUPNAME2_staticfilename2.xml
           /GROUPNAME2_staticfilename3.xml
           /GROUPNAME2.flag


Попытки пока что

У меня есть следующий маршрут (запутанные имена), с которого начинается обработка:

@Override
public void configure() throws Exception 
{
    getContext().addEndpoint("processShare", createProcessShareEndpoint());

    from("processShare")
        .process(new InputFileRouter())
        .choice()
            .when()
                .simple("${header.processorName} == '" + InputFileType.TYPE1 + "'")
                .to("seda://type1?size=1")
            .when()
                .simple("${header.processorName} == '" + InputFileType.TYPE2 + "'")
                .to("seda://type2?size=1")
            .when()
                .simple("${header.processorName} == '" + InputFileType.TYPE3 + "'")
                .to("seda://type3?size=1")
            .when()
                .simple("${header.processorName} == '" + InputFileType.TYPE4 + "'")
                .to("seda://type4?size=1")
            .when()
                .simple("${header.processorName} == '" + InputFileType.TYPE5 + "'")
                .to("seda://type5?size=1")
            .when()
                .simple("${header.processorName} == '" + InputFileType.TYPE6 + "'")
                .to("seda://type6?size=1")
            .when()
                .simple("${header.processorName} == '" + InputFileType.TYPE7 + "'")
                .to("seda://type7?size=1")
            .otherwise()
                .log(LoggingLevel.FATAL, "Unknown file type encountered during processing! --> ${body}");
}


Мои проблемы связаны с настройкой конечной точки файла. В настоящее время я пытаюсь программно настроить конечную точку без особой удачи. До сих пор мой опыт работы с верблюдом был преимущественно при использовании Spring DSL, а не Java DSL.

Я пошел по пути создания экземпляра объекта FileEndpoint, но всякий раз, когда создается маршрут, я получаю сообщение об ошибке, говорящее о том, что свойство файла имеет значение null. Я считаю, что это потому, что я должен создавать FileComponent, а не конечную точку. Я не создаю конечную точку без использования URI, потому что я не могу указать динамическую дату в имени каталога, используя URI.

private FileEndpoint createProcessShareEndpoint() throws ConfigurationException
    {
        FileEndpoint endpoint = new FileEndpoint();

        //Custom directory "ready to process" implementation.
        endpoint.setProcessStrategy(getContext().getRegistry().lookup(
                "inputFileProcessStrategy", MyFileInputProcessStrategy.class));

        try 
        {
            //Controls the number of files returned per directory poll.
            endpoint.setMaxMessagesPerPoll(Integer.parseInt(
                    PropertiesUtil.getProperty(
                            AdapterConstants.OUTDIR_MAXFILES, "1")));
        } 
        catch (NumberFormatException e) 
        {
            throw new ConfigurationException(String.format(
                    "Property %s is required to be an integer.", 
                    AdapterConstants.OUTDIR_MAXFILES), e);
        }

        Map<String, Object> consumerPropertiesMap = new HashMap<String, Object>();

        //Controls the delay between directory polls.
        consumerPropertiesMap.put("delay", PropertiesUtil.getProperty(
                AdapterConstants.OUTDIR_POLLING_MILLIS));

        //Controls which files are included in directory polls.
        //Regex that matches file extensions (eg. {SOME_FILE}.flag)
        consumerPropertiesMap.put("include", "^.*(." + PropertiesUtil.getProperty(
                AdapterConstants.OUTDIR_FLAGFILE_EXTENSION, "flag") + ")");

        endpoint.setConsumerProperties(consumerPropertiesMap);

        GenericFileConfiguration configuration = new GenericFileConfiguration();

        //Controls the directory to be polled by the endpoint.
        if(CommandLineOptions.getInstance().getInputDirectory() != null)
        {
            configuration.setDirectory(CommandLineOptions.getInstance().getInputDirectory());
        }
        else
        {
            SimpleDateFormat dateFormat = new SimpleDateFormat(PropertiesUtil.getProperty(AdapterConstants.OUTDIR_DATE_FORMAT, "MM-dd-yyyy"));

            configuration.setDirectory(
                    PropertiesUtil.getProperty(AdapterConstants.OUTDIR_ROOT) + "\\" +
                    dateFormat.format(new Date()));
        }

        endpoint.setConfiguration(configuration);

        return endpoint;


Вопросы

  1. Является ли реализация GenericFileProcessingStrategy правильным решением в этой ситуации? Если да, то есть ли где-нибудь пример этого? Я просмотрел юнит-тесты верблюжьего файла и не увидел ничего, что выскочило на меня.

  2. Что я делаю не так с настройкой конечной точки? Я чувствую, что ответ на решение этой проблемы связан с вопросом 3.

  3. Можно ли настроить конечную точку файла для прокрутки датированных папок при опросе и изменении даты?

Как всегда, спасибо за помощь.

1 Ответ

2 голосов
/ 12 марта 2011

Вы можете обратиться к пользовательской ProcessStrategy из конечной точки uri, используя опцию processStrategy, например, file: xxxx? ProcessStrategy = # myProcess.Обратите внимание, как мы добавляем к значению префикс #, чтобы указать, что он должен искать его в реестре.Так что в Spring XML вы просто добавляетеtag

В Java, вероятно, проще получить конечную точку из API CamelContext:

FileEndpoint file = context.getEndpoint("file:xxx?aaa=123&bbb=456", FileEndpoint.class);

Это позволяет предварительно настроить конечную точку.И, конечно, впоследствии вы можете использовать API в FileEndpoint для установки других конфигураций.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...