Spring-интеграция с AWS S3 повторная попытка не удалось получить - PullRequest
0 голосов
/ 24 сентября 2018

Мы используем пружинную интеграцию с S3.У нас есть s3-inbound-streaming-channel-adapter для чтения из S3.Происходит следующее: если метод get завершается неудачно, s3-inbound-streaming-channel-adapter помещает имя файла в "acceptOnceFilter" и не повторяет попытку при сбое.

Q1.То, что мы хотим, это когда s3-inbound-streaming-channel-adapter «получает» файл от S3 и говорит, что по какой-то причине это «get» завершается ошибкой ... как мы можем получить s3-inbound-streaming-channel-adapter дляповторить этот запрос "get" еще раз для того же файла?

Q2.При ошибке исключение отправляется в defaultChannel по умолчанию из s3-inbound-streaming-channel-adapter.Будет ли Сообщение в исключении содержать «имя файла», которое не удалось?

<int:channel id="s3FileProcessingChannel">
  <int:queue capacity="15"/>
</int:channel>

<bean id="metadataStore" class="org.springframework.integration.metadata.SimpleMetadataStore"/>

<bean id="acceptOnceFilter"
  class="org.springframework.integration.aws.support.filters.S3PersistentAcceptOnceFileListFilter">
  <constructor-arg index="0" ref="metadataStore"/>
  <constructor-arg index="1" value="streaming"/>
</bean>

<int-aws:s3-inbound-streaming-channel-adapter id="s3Region1"
channel="s3FileProcessingChannel" 
session-factory="s3SessionFactory"
filter="acceptOnceFilter"
remotedirectoryexpression="'${s3.sourceBucket}/emm'">

  <int:poller fixed-delay="1000" max-messages-per-poll="15"/>
</int-aws:s3-inbound-streaming-channel-adapter>

Спасибо GM

1 Ответ

0 голосов
/ 24 сентября 2018

S3PersistentAcceptOnceFileListFilter реализует:

/**
 * A {@link FileListFilter} that can be reset by removing a specific file from its
 * state.
 * @author Gary Russell
 * @since 4.1.7
 *
 */
public interface ResettableFileListFilter<F> extends FileListFilter<F> {

    /**
     * Remove the specified file from the filter so it will pass on the next attempt.
     * @param f the element to remove.
     * @return true if the file was removed as a result of this call.
     */
    boolean remove(F f);

}

И S3StreamingMessageSource заполняет заголовки следующим образом:

return getMessageBuilderFactory()
                    .withPayload(session.readRaw(remotePath))
                    .setHeader(IntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCE, session)
                    .setHeader(FileHeaders.REMOTE_DIRECTORY, file.getRemoteDirectory())
                    .setHeader(FileHeaders.REMOTE_FILE, file.getFilename())
                    .setHeader(FileHeaders.REMOTE_FILE_INFO,
                            this.fileInfoJson ? file.toJson() : file);

Когда произошла ошибка, вам просто нужно использовать эту FileHeaders.REMOTE_FILE для вызоваупомянутое выше remove() и ваш сбойный файл будет получен с S3 в следующем цикле опроса.

...