Spring Integration AWS s3-inbound-streaming-channel-adapter поток из нескольких сегментов s3 - PullRequest
0 голосов
/ 02 июня 2018

Я использую пружинную интеграцию на основе XML и использую s3-inbound-streaming-channel-adapter для потоковой передачи из одного s3 сегмента.

Теперь у нас есть требование для потоковой передачи из two s3 сегмента.

Так возможно ли для s3-inbound-streaming-channel-adapter потоковой передачи из нескольких сегментов?

Или мне нужно создать отдельное s3-inbound-streaming-channel-adapter для каждого s3 сегмента?

Это мой текущий набор для одного сегмента s3, и он работает.

<int-aws:s3-inbound-streaming-channel-adapter 
channel="s3Channel"
session-factory="s3SessionFactory" 
filter="acceptOnceFilter"
remote-directory-expression="'bucket-1'">
    <int:poller fixed-rate="1000"/>
</int-aws:s3-inbound-streaming-channel-adapter>

Заранее спасибо.

ОБНОВЛЕНИЕ:

В итоге у меня два s3-inbound-streaming-channel-adapter, как указано Артем Билан ниже.

Однако для каждого входящего адаптера мне пришлось объявить экземпляры acceptOnceFilter и metadataStore отдельно.

Это потому, что если у меня был только один экземпляр acceptOnceFilter и metadataStore, и они были общими для двух входящих адаптеров, то начиналось какое-то странное зацикливание.

например Когдаfile_1.csv поступил в bucket-1 и был обработан, а затем, если вы поместите тот же файл file_1.csv в bucket-2 , то начнется странная зацикливание.Не знаю почему!В итоге я создал acceptOnceFilter и metadataStore для каждого входящего адаптера.

`

    <!-- ===================================================== -->
    <!-- Region 1 s3-inbound-streaming-channel-adapter setting -->
    <!-- ===================================================== -->

    <bean id="metadataStore" class="org.springframework.integration.metadata.SimpleMetadataStore"/>

    <bean id="acceptOnceFilter"
          class="org.springframework.integration.aws.support.filters.S3PersistentAcceptOnceFileListFilter">
        <constructor-arg index="0" ref="metadataStore"/>
        <constructor-arg index="1" value="streaming"/>
    </bean>

    <int-aws:s3-inbound-streaming-channel-adapter id="s3Region1"
                                                  channel="s3Channel"
                                                  session-factory="s3SessionFactory"
                                                  filter="acceptOnceFilter"
                                                  remote-directory-expression="'${s3.bucketOne.name}'">
        <int:poller fixed-rate="1000"/>
    </int-aws:s3-inbound-streaming-channel-adapter>

    <int:channel id="s3Channel">
        <int:queue capacity="50"/>
    </int:channel>

    <!-- ===================================================== -->
    <!-- Region 2 s3-inbound-streaming-channel-adapter setting -->
    <!-- ===================================================== -->

    <bean id="metadataStoreRegion2" class="org.springframework.integration.metadata.SimpleMetadataStore"/>

    <bean id="acceptOnceFilterRegion2"
          class="org.springframework.integration.aws.support.filters.S3PersistentAcceptOnceFileListFilter">
        <constructor-arg index="0" ref="metadataStoreRegion2"/>
        <constructor-arg index="1" value="streaming"/>
    </bean>

    <int-aws:s3-inbound-streaming-channel-adapter id="s3Region2"
                                                  channel="s3ChannelRegion2"
                                                  session-factory="s3SessionFactoryRegion2"
                                                  filter="acceptOnceFilterRegion2"
                                                  remote-directory-expression="'${s3.bucketTwo.name}'">
        <int:poller fixed-rate="1000"/>
    </int-aws:s3-inbound-streaming-channel-adapter>

    <int:channel id="s3ChannelRegion2">
        <int:queue capacity="50"/>
    </int:channel>

`

1 Ответ

0 голосов
/ 04 июня 2018

Это верно, текущая реализация поддерживает только один remote directory для периодического опроса.В настоящий момент мы действительно работаем над тем, чтобы формализовать такое решение, как готовая функция.Похожий запрос был получен для поддержки (S) FTP, особенно когда целевой каталог не известен заранее во время настройки.

Если это не имеет большого значения для вас, чтобы настроить несколько канальных адаптеров для каждого длякаталог, это было бы здорово.Вы всегда можете отправлять сообщения от них на один и тот же канал для обработки.

В противном случае вы можете рассмотреть возможность зацикливания списка сегментов с помощью:

  <xsd:attribute name="remote-directory-expression" type="xsd:string">
        <xsd:annotation>
            <xsd:documentation>
                Specify a SpEL expression which will be used to evaluate the directory
                path to where the files will be transferred
                (e.g., "headers.['remote_dir'] + '/myTransfers'" for outbound endpoints)
                There is no root object (message) for inbound endpoints
                (e.g., "@someBean.fetchDirectory");
            </xsd:documentation>
        </xsd:annotation>
    </xsd:attribute>

в некотором компоненте.

...