Синхронизируйте S3 Bucket и слушайте изменения - PullRequest
0 голосов
/ 23 октября 2019

У меня есть корзина AWS S3, где я еженедельно размещаю новый ZIP-файл.

Я хочу добавить функциональность в мою существующую веб-службу, написанную с помощью Spring Boot: синхронизировать корзину локально и следить за изменениями.

В настоящее время синхронизация работает хорошо: всякий раз, когда новаяфайл добавляется в корзину, он загружается локально. Тем не менее, я не знаю, чтобы прослушивать обновления файлов, это метод, который запускается, когда новый файл загружается локально. Можно ли это сделать?

Это кусок кода, который у меня есть:

#  --------
# | AWS S3 |
#  --------
s3.credentials-access-key=***
s3.credentials-secret-key=****
s3.bucket = my-bucket
s3.remote-dir = zips
s3.local-dir = D:/s3-bucket/
@Log4j2
@Configuration
public class S3Config {

    public static final String OUT_CHANNEL_NAME = "s3filesChannel";

    @Value("${s3.credentials-access-key}") private String accessKey;
    @Value("${s3.credentials-secret-key}") private String secretKey;
    @Value("${s3.remote-dir}") private String remoteDir;
    @Value("${s3.bucket}") private String s3bucket;
    @Value("${s3.local-dir}") private String localDir;

    /*
     * AWS S3
     */
    @Bean
    public AmazonS3 getAmazonS3(

    ){
        BasicAWSCredentials creds = new BasicAWSCredentials(accessKey, secretKey);
        AmazonS3 s3client = AmazonS3ClientBuilder
                .standard()
                .withRegion(Regions.EU_WEST_1)
                .withCredentials(new AWSStaticCredentialsProvider(creds))
                .build();
        return s3client;        
    }

    @Bean
    public S3SessionFactory s3SessionFactory(AmazonS3 pAmazonS3) {
        return new S3SessionFactory(pAmazonS3);
    }

    @Bean
    public S3InboundFileSynchronizer s3InboundFileSynchronizer(S3SessionFactory pS3SessionFactory) {
        S3InboundFileSynchronizer sync = new S3InboundFileSynchronizer(pS3SessionFactory);
        sync.setPreserveTimestamp(true);
        sync.setDeleteRemoteFiles(false);
        String fullRemotePath = s3bucket.concat("/").concat(remoteDir);
        sync.setRemoteDirectory(fullRemotePath);
        sync.setFilter(new S3RegexPatternFileListFilter(".*\\.zip$"));
        return sync;
    }

    @Bean
    @InboundChannelAdapter(value = OUT_CHANNEL_NAME, poller = @Poller(fixedDelay = "30"))
    public S3InboundFileSynchronizingMessageSource s3InboundFileSynchronizingMessageSource(
        S3InboundFileSynchronizer pS3InboundFileSynchronizer
    ) {
        S3InboundFileSynchronizingMessageSource messageSource = new S3InboundFileSynchronizingMessageSource(pS3InboundFileSynchronizer);
        messageSource.setAutoCreateLocalDirectory(true);
        messageSource.setLocalDirectory(new File(localDir));
        messageSource.setLocalFilter(new AcceptOnceFileListFilter<File>());
        return messageSource;
    }

    @Bean("s3filesChannel")
    public PollableChannel s3FilesChannel() {
        return new QueueChannel();
    }

    @Bean
    public IntegrationFlow fileReadingFlow(
            S3InboundFileSynchronizingMessageSource pS3InboundFileSynchronizingMessageSource,
            GtfsBizkaibus pGtfsBizkaibus,
            @Qualifier("fileProcessor") MessageHandler pMessageHandler) {
        return IntegrationFlows
                .from(pS3InboundFileSynchronizingMessageSource, e -> e.poller(p -> p.fixedDelay(5, TimeUnit.SECONDS)))
                .handle(pMessageHandler)
                .get();
    }

    @Bean("fileProcessor")
    public MessageHandler fileProcessor() {
        FileWritingMessageHandler handler = new FileWritingMessageHandler(new File(localDir));
        handler.setExpectReply(false); // end of pipeline, reply not needed
        handler.setFileExistsMode(FileExistsMode.APPEND);
        handler.setNewFileCallback((file, msg) -> {
            log.debug("New file created... " + file.getAbsolutePath());
        });
        return handler;
    }

Ответы [ 3 ]

2 голосов
/ 23 октября 2019

Вы можете использовать уведомления о событиях S3 и очередь SQS. По сути, когда объект добавляется в ваше ведро, S3 может публиковать событие в зарегистрированной очереди SQS. Затем ваше локальное приложение может долго опрашивать очередь на наличие новых событий и обрабатывать все добавленные события.

здесь для получения дополнительной информации.
1 голос
/ 23 октября 2019

На самом деле, S3InboundFileSynchronizingMessageSource выполняет всю необходимую работу за вас: когда новый файл добавляется в удаленную корзину, он загружается в локальный каталог и выводится как payload в сообщении, которое будет отправлено на настроенный канал. .

Когда удаленный файл изменяется, он также загружается в локальный каталог.

Начиная с версии 5.0, AbstractInboundFileSynchronizingMessageSource предоставляет эту опцию:

/**
 * Switch the local {@link FileReadingMessageSource} to use its internal
 * {@code FileReadingMessageSource.WatchServiceDirectoryScanner}.
 * @param useWatchService the {@code boolean} flag to switch to
 * {@code FileReadingMessageSource.WatchServiceDirectoryScanner} on {@code true}.
 * @since 5.0
 */
public void setUseWatchService(boolean useWatchService) {
    this.fileSource.setUseWatchService(useWatchService);
    if (useWatchService) {
        this.fileSource.setWatchEvents(
                FileReadingMessageSource.WatchEventType.CREATE,
                FileReadingMessageSource.WatchEventType.MODIFY,
                FileReadingMessageSource.WatchEventType.DELETE);
    }
}

Если это имеет какой-то смысл для вас.

Но да ... с уведомлением от S3 до SQS это также будет хорошим решением. В проекте Spring Integration AWS есть SqsMessageDrivenChannelAdapter.

0 голосов
/ 23 октября 2019

Наконец, как предложил @Artem Bilian, я использовал аннотацию @ServiceActivator. Вот полный пример:

import java.io.File;
import java.util.concurrent.TimeUnit;

import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.InboundChannelAdapter;
import org.springframework.integration.annotation.Poller;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.aws.inbound.S3InboundFileSynchronizer;
import org.springframework.integration.aws.inbound.S3InboundFileSynchronizingMessageSource;
import org.springframework.integration.aws.support.S3SessionFactory;
import org.springframework.integration.aws.support.filters.S3RegexPatternFileListFilter;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.file.FileWritingMessageHandler;
import org.springframework.integration.file.support.FileExistsMode;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.PollableChannel;

import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;

import lombok.extern.log4j.Log4j2;

@Log4j2
@Configuration
public class S3Config {

    public static final String IN_CHANNEL_NAME = "s3filesChannel";

    @Value("${s3.credentials-access-key}") private String accessKey;
    @Value("${s3.credentials-secret-key}") private String secretKey;
    @Value("${s3.remote-dir}") private String remoteDir;
    @Value("${s3.bucket}") private String s3bucket;
    @Value("${s3.local-dir}") private String localDir;

    /*
     * AWS S3
     */
    @Bean
    public AmazonS3 getAmazonS3(

    ){
        BasicAWSCredentials creds = new BasicAWSCredentials(accessKey, secretKey);
        AmazonS3 s3client = AmazonS3ClientBuilder
                .standard()
                .withRegion(Regions.EU_WEST_1)
                .withCredentials(new AWSStaticCredentialsProvider(creds))
                .build();
        return s3client;        
    }

    @Bean
    public S3SessionFactory s3SessionFactory(AmazonS3 pAmazonS3) {
        return new S3SessionFactory(pAmazonS3);
    }

    @Bean
    public S3InboundFileSynchronizer s3InboundFileSynchronizer(S3SessionFactory pS3SessionFactory) {
        S3InboundFileSynchronizer sync = new S3InboundFileSynchronizer(pS3SessionFactory);
        sync.setPreserveTimestamp(true);
        sync.setDeleteRemoteFiles(false);
        String fullRemotePath = s3bucket.concat("/").concat(remoteDir);
        sync.setRemoteDirectory(fullRemotePath);
        sync.setFilter(new S3RegexPatternFileListFilter(".*\\.zip$"));
        return sync;
    }

    @Bean
    @InboundChannelAdapter(value = IN_CHANNEL_NAME, poller = @Poller(fixedDelay = "30"))
    public S3InboundFileSynchronizingMessageSource s3InboundFileSynchronizingMessageSource(
        S3InboundFileSynchronizer pS3InboundFileSynchronizer
    ) {
        S3InboundFileSynchronizingMessageSource messageSource = new S3InboundFileSynchronizingMessageSource(pS3InboundFileSynchronizer);
        messageSource.setAutoCreateLocalDirectory(true);
        messageSource.setLocalDirectory(new File(localDir));
        messageSource.setUseWatchService(true);
        return messageSource;
    }

    @Bean("s3filesChannel")
    public PollableChannel s3FilesChannel() {
        return new QueueChannel();
    }

    @Bean
    public IntegrationFlow fileReadingFlow(
            S3InboundFileSynchronizingMessageSource pS3InboundFileSynchronizingMessageSource,
            @Qualifier("fileProcessor") MessageHandler pMessageHandler) {
        return IntegrationFlows
                .from(pS3InboundFileSynchronizingMessageSource, e -> e.poller(p -> p.fixedDelay(5, TimeUnit.SECONDS)))
                .handle(pMessageHandler)
                .get();
    }

    @Bean("fileProcessor")
    public MessageHandler fileProcessor() {
        FileWritingMessageHandler handler = new FileWritingMessageHandler(new File(localDir));
        handler.setExpectReply(false); // end of pipeline, reply not needed
        handler.setFileExistsMode(FileExistsMode.REPLACE);
        return handler;
    }

    @ServiceActivator(inputChannel = IN_CHANNEL_NAME, poller = @Poller(fixedDelay = "30"))
    public void asada(Message<?> message) {
        // TODO
        log.debug("<< New message!");
    }

}

Обратите внимание, что я заменил OUT_CHANNEL_NAME на IN_CHANNEL_NAME.

PS: я почти новичок в Spring Integration, поэтому я все еще изучаю его концепции.

...