Чтение и запись файла из и в winSCP из хранилища объектов S3 - PullRequest
1 голос
/ 19 февраля 2020

Я пытаюсь поместить и прочитать файл из удаленной файловой системы, используя winSCP через соединение SFTP. Конечный узел файловой системы - это хранилище объектов s3, которое содержит файлы (например, xyz.txt). Ниже приведен переопределенный метод класса File Channel.

XYZFileSystemProvider

public class XYZFileSystemProvider extends FileSystemProvider {
    @Override
        public FileChannel newFileChannel(Path path, Set<? extends OpenOption> options, FileAttribute<?>... attrs)
                throws IOException {
            // TODO Auto-generated method stub

            Collection<XYZOptions.OpenMode> modes = XYZOptions.OpenMode.fromOpenOptions(options);
            if (modes.isEmpty()) {
                modes = EnumSet.of(XYZOptions.OpenMode.Read, MFEOptions.OpenMode.Write);
            }
            // TODO: process file attributes
            return new XYZFileSystemChannel(path, modes);
        }
}

XYZFileSystemChannel

public class XYZFileSystemChannel extends XYZRemotePathChannel{

    public XYZFileSystemChannel(XYZPath p, Collection<XYZOptions.OpenMode> modes) throws IOException {
        this(Objects.requireNonNull(p, "No target path").toString(), p.getFileSystem(), modes);
    }

    public XYZFileSystemChannel(String remotePath, XYZFileSystem fs, Collection<XYZOptions.OpenMode> modes) throws IOException {
        super(remotePath, fs, true, modes);
    }

}

XYZRemotePathChannel

public class XYZRemotePathChannel extends FileChannel {

    private AmazonS3Component getAmazonS3Instance() {
        return SpringContext.getBean(AmazonS3Component.class);
    }

    private final String path;
    private final Collection<XYZOptions.OpenMode> modes;
    private final boolean closeOnExit;
    private XYZFileSystem fileSystem;
    private final AtomicLong posTracker = new AtomicLong(0L);
    public static final Set<XYZOptions.OpenMode> READ_MODES =
            Collections.unmodifiableSet(EnumSet.of(XYZOptions.OpenMode.Read));
    private final Object lock = new Object();
    private final AtomicReference<Thread> blockingThreadHolder = new AtomicReference<>(null);

    public XYZRemotePathChannel(String path, XYZFileSystem fileSystem, boolean closeOnExit,
            Collection<XYZOptions.OpenMode> modes) throws IOException {
        this.path = ValidateUtils.checkNotNullAndNotEmpty(path, "No remote file path specified");
        this.modes = Objects.requireNonNull(modes, "No channel modes specified");
        this.closeOnExit = closeOnExit;
        this.fileSystem = fileSystem;
    }

    @Override
    public int read(ByteBuffer dst) throws IOException {
        // TODO Auto-generated method stub
        log.debug("Position of dst is : {}",dst.position());
        log.debug("Reading the bytes of the file : {}", dst);
        //Some code to be done here in order to read dst and send bytes of the file recieved from s3 store
        return (int) doRead(Collections.singletonList(dst), -1);
    }

    protected long doRead(List<ByteBuffer> buffers, long position) throws IOException {
        log.debug("Do Reading the bytes of the file of list of buffer : {} and position :{}", buffers , position);
        ensureOpen(READ_MODES);
        synchronized (lock) {
            boolean completed = false;
            boolean eof = false;
            long curPos = (position >= 0L) ? position : posTracker.get();
            byte[] bytes = new byte[(int) curPos];
            try {
                long totalRead = 0;
                beginBlocking();
                String [] parts = this.path.toString().replaceFirst("^/", "").split("/");
                String bucket = parts[parts.length-2];
                String fileName = parts[parts.length-1];
                InputStream fileContent = getAmazonS3Instance().getFileFromBucket(bucket, fileName);
                log.debug("Contens of the file: {} from bucket: {} are : {}", fileName , bucket, fileContent);

                //Some code to be done here to return the content byte length??
                int fileLenght = fileContent.read(bytes, 1, (int) curPos);
                log.debug("After reading the file content the file length is : {}" , fileLenght );
                return fileLenght;
            } finally {
                if (position < 0L) {
                    posTracker.set(curPos);
                }
                endBlocking(completed);
            }
        }
    }

    private void endBlocking(boolean completed) throws AsynchronousCloseException {
        blockingThreadHolder.set(null);
        end(completed);
    }

    private void beginBlocking() {
        begin();
        blockingThreadHolder.set(Thread.currentThread());
    }

    @Override
    public FileChannel position(long newPosition) throws IOException {
        // TODO Auto-generated method stub
        log.debug("Setting the position of the file : {}", newPosition);
        if (newPosition < 0L) {
            throw new IllegalArgumentException("position(" + this.path + ") illegal file channel position: " + newPosition);
        }

        ensureOpen(Collections.emptySet());
        posTracker.set(newPosition);
        return this;
    }

    private void ensureOpen(Collection<XYZOptions.OpenMode> reqModes) throws IOException {
        if (!isOpen()) {
            throw new ClosedChannelException();
        }

        if (GenericUtils.size(reqModes) > 0) {
            for (XYZOptions.OpenMode m : reqModes) {
                if (this.modes.contains(m)) {
                    return;
                }
            }

            throw new IOException("ensureOpen(" + this.path + ") current channel modes (" + this.modes
                    + ") do contain any of the required: " + reqModes);
        }
    }
}

XYZOptions

public class XYZOptions {

    enum OpenMode {
        Read, Write, Append, Create, Truncate, Exclusive;

        public static final Set<OpenOption> SUPPORTED_OPTIONS = Collections
                .unmodifiableSet(EnumSet.of(StandardOpenOption.READ, StandardOpenOption.APPEND,
                        StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.WRITE,
                        StandardOpenOption.CREATE_NEW, StandardOpenOption.SPARSE));

        public static Set<OpenMode> fromOpenOptions(Collection<? extends OpenOption> options) {
            if (GenericUtils.isEmpty(options)) {
                return Collections.emptySet();
            }

            Set<OpenMode> modes = EnumSet.noneOf(OpenMode.class);
            for (OpenOption option : options) {
                if (option == StandardOpenOption.READ) {
                    modes.add(Read);
                } else if (option == StandardOpenOption.APPEND) {
                    modes.add(Append);
                } else if (option == StandardOpenOption.CREATE) {
                    modes.add(Create);
                } else if (option == StandardOpenOption.TRUNCATE_EXISTING) {
                    modes.add(Truncate);
                } else if (option == StandardOpenOption.WRITE) {
                    modes.add(Write);
                } else if (option == StandardOpenOption.CREATE_NEW) {
                    modes.add(Create);
                    modes.add(Exclusive);
                } else if (option == StandardOpenOption.SPARSE) {
                    continue;
                } else {
                    throw new IllegalArgumentException("Unsupported open option: " + option);
                }
            }

            return modes;
        }
    }

}

Я не могу извлечь файл из хранилища s3, но не знаю, как прочитать и передать все содержимое, в то время как кто-то перетаскивает из удаленного местоположения файла в свою собственную систему, используя winSCP. Я знаю, что мне не хватает какого-то кода в указанном месте, но я не уверен, как его достичь.

...