java.lang.IllegalArgumentException: не содержит действительного хоста: полномочия порта: gvs2: asdf12#@192.168.134.222: 22 - PullRequest
0 голосов
/ 08 ноября 2019

Я создаю плагин пакетного источника SFTP на основе this или, вкратце, вы можете сказать, что я подключаюсь к серверу SFTP и получаю эту проблему:

java.lang.IllegalArgumentException: Does not contain a valid host:port authority: gvs2:asdf12#@192.168.134.222:22
at org.apache.hadoop.net.NetUtils.createSocketAddr(NetUtils.java:213) ~[hadoop-common-2.7.3.2.6.5.0-292.jar:na]
at org.apache.hadoop.net.NetUtils.createSocketAddr(NetUtils.java:164) ~[hadoop-common-2.7.3.2.6.5.0-292.jar:na]
at org.apache.hadoop.security.SecurityUtil.buildDTServiceName(SecurityUtil.java:322) ~[hadoop-common-2.7.3.2.6.5.0-292.jar:na]
at org.apache.hadoop.fs.FileSystem.getCanonicalServiceName(FileSystem.java:322) ~[hadoop-common-2.7.3.2.6.5.0-292.jar:na]
at org.apache.hadoop.fs.FileSystem.collectDelegationTokens(FileSystem.java:543) ~[hadoop-common-2.7.3.2.6.5.0-292.jar:na]
at org.apache.hadoop.fs.FileSystem.addDelegationTokens(FileSystem.java:527) ~[hadoop-common-2.7.3.2.6.5.0-292.jar:na]
at org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodesInternal(TokenCache.java:140) ~[hadoop-mapreduce-client-core-2.7.3.2.6.5.0-292.jar:na]
at org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodesInternal(TokenCache.java:100) ~[hadoop-mapreduce-client-core-2.7.3.2.6.5.0-292.jar:na]
at org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodes(TokenCache.java:80) ~[hadoop-mapreduce-client-core-2.7.3.2.6.5.0-292.jar:na]
at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:243) ~[hadoop-mapreduce-client-core-2.7.3.2.6.5.0-292.jar:na]
at org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.getSplits(CombineFileInputFormat.java:216) ~[hadoop-mapreduce-client-core-2.7.3.2.6.5.0-292.jar:na]
at co.cask.hydrator.format.input.CombinePathTrackingInputFormat.getSplits(CombinePathTrackingInputFormat.java:61) ~[format-common-2.1.1-SNAPSHOT.jar:na]
at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:127) ~[spark-core_2.11-2.3.0.2.6.5.0-292.jar:2.3.0.2.6.5.0-292]
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253) ~[spark-core_2.11-2.3.0.2.6.5.0-292.jar:2.3.0.2.6.5.0-292]
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251) ~[spark-core_2.11-2.3.0.2.6.5.0-292.jar:2.3.0.2.6.5.0-292]
at scala.Option.getOrElse(Option.scala:121) ~[scala-library-2.11.8.jar:na]

Я получаю эту ошибку при использовании учетных данных: gvs2:asdf12#. С другой стороны, если я использую пароль, который не имеет специальных символов, например, gvs1:asdf123, тогда он прекрасно работает.

Я делаю это соединение SFTP, используя apache hadoop @ 2.7.3 библиотека, которая не поддерживает SFTP по умолчанию, поэтому я использовал другую упомянутую утилиту здесь .

Вот моя SFTPFileSystem реализация.

package com.guavus.plugin.sftp;

import ch.ethz.ssh2.*;
import ch.ethz.ssh2.sftp.ErrorCodes;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.Progressable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;

/**
 * Based on implementation posted on Hadoop JIRA.
 * Using Ganymede SSH lib for improved performance.
 *
 * @author wnagele
 * @see https://issues.apache.org/jira/browse/HADOOP-5732
 */
public class SFTPFileSystem extends FileSystem {
    private static Logger logger = LoggerFactory.getLogger(SFTPFileSystem.class);
    private final int MAX_BUFFER_SIZE = 32768;
    private final int DEFAULT_PORT = 22;
    private final String DEFAULT_KEY_FILE = "${user.home}/.ssh/id_rsa";
    private final String DEFAULT_KNOWNHOSTS_FILE = "${user.home}/.ssh/known_hosts";

    private final String PARAM_BUFFER_SIZE = "io.file.buffer.size";
    private final String PARAM_KEY_FILE = "fs.sftp.key.file";
    private final String PARAM_KEY_PASSWORD = "fs.sftp.key.password";
    private final String PARAM_KNOWNHOSTS = "fs.sftp.knownhosts";

    public static final String PARAM_HOST = "fs.sftp.host";
    public static final String PARAM_PORT = "fs.sftp.port";
    public static final String PARAM_USER = "fs.sftp.user";
    public static final String PARAM_PASSWORD = "fs.sftp.password";

    private Configuration conf;
    private URI uri;
    private SFTPv3Client client;
    private Connection connection;

    // This is older implementation. Keeping it here for future reference.
    @Override
    public void initialize(URI uri, Configuration conf) throws IOException {
//        Logger.getLogger("ch.ethz.ssh2").setLevel(Level.OFF);

        this.uri = uri;
        this.conf = conf;
        // If no explicit buffer was set use the maximum.
        // Also limit the buffer to the maximum value.
        int bufferSize = conf.getInt(PARAM_BUFFER_SIZE, -1);
        if (bufferSize > MAX_BUFFER_SIZE || bufferSize == -1)
            conf.setInt(PARAM_BUFFER_SIZE, MAX_BUFFER_SIZE);

        setConf(conf);

        try {
            connect();
        } catch (IOException e) {
            // Ensure to close down connections if we fail during initialization
            close();
            throw e;
        }
    }

    /**
     * The current implementation of @see KnownHosts does not support
     * known_hosts entries that use a non-default port.
     * If we encounter such an entry we wrap it into the known_hosts
     * format before looking it up.
     */
    private class PortAwareKnownHosts extends KnownHosts {
        public PortAwareKnownHosts(File knownHosts) throws IOException {
            super(knownHosts);
        }

        public int verifyHostkey(String hostname, int port, String serverHostKeyAlgorithm, byte[] serverHostKey) throws IOException {
            if (port != 22) {
                StringBuffer sb = new StringBuffer();
                sb.append('[');
                sb.append(hostname);
                sb.append("]:");
                sb.append(port);
                hostname = sb.toString();
            }

            return super.verifyHostkey(hostname, serverHostKeyAlgorithm, serverHostKey);
        }
    }

    protected void connect() throws IOException {
        if (client == null || !client.isConnected()) {
            String host = conf.get(PARAM_HOST);
            int port = conf.getInt(PARAM_PORT, DEFAULT_PORT);
            String key = conf.get(PARAM_KEY_FILE, DEFAULT_KEY_FILE);
            String keyPassword = conf.get(PARAM_KEY_PASSWORD);
            String user = conf.get(PARAM_USER);
            final String password = conf.get(PARAM_PASSWORD);
            String knownHostsFile = conf.get(PARAM_KNOWNHOSTS, null);

            logger.info("know hosts file: {}", knownHostsFile);

//            final PortAwareKnownHosts knownHosts = new PortAwareKnownHosts(new File(knownHostsFile));

            connection = new Connection(host, port);
            connection.connect(new ServerHostKeyVerifier() {
                @Override
                public boolean verifyServerHostKey(String hostname, int port, String serverHostKeyAlgorithm, byte[] serverHostKey) throws Exception {
//                    if (knownHosts.verifyHostkey(hostname, port, serverHostKeyAlgorithm, serverHostKey) == KnownHosts.HOSTKEY_IS_OK)
                    return true;
//                    throw new IOException("Couldn't verify host key for " + hostname);
                }
            });

            if (password != null) {
                if (connection.isAuthMethodAvailable(user, "password")) {
                    connection.authenticateWithPassword(user, password);
                } else if (connection.isAuthMethodAvailable(user, "keyboard-interactive")) {
                    connection.authenticateWithKeyboardInteractive(user, new InteractiveCallback() {
                        @Override
                        public String[] replyToChallenge(String name, String instruction, int numPrompts, String[] prompt, boolean[] echo) throws Exception {
                            switch (prompt.length) {
                                case 0:
                                    return new String[0];
                                case 1:
                                    return new String[]{password};
                            }
                            throw new IOException("Cannot proceed with keyboard-interactive authentication. Server requested " + prompt.length + " challenges, we only support 1.");
                        }
                    });
                } else {
                    throw new IOException("Server does not support any of our supported password authentication methods");
                }
            } else {
                connection.authenticateWithPublicKey(user, new File(key), keyPassword);
            }

            client = new SFTPv3Client(connection);
        }
    }

    @Override
    public void close() throws IOException {
        super.close();
        if (client != null && client.isConnected())
            client.close();
        if (connection != null)
            connection.close();
    }

    @Override
    public FSDataInputStream open(Path file) throws IOException {
        SFTPInputStream is = openInternal(file);
        return new FSDataInputStream(is);
    }

    @Override
    public FSDataInputStream open(Path file, int bufferSize) throws IOException {
        SFTPInputStream is = openInternal(file);
        return new FSDataInputStream(new BufferedFSInputStream(is, bufferSize));
    }

    private SFTPInputStream openInternal(Path file) throws IOException {
        if (getFileStatus(file).isDir())
            throw new IOException("Path " + file + " is a directory.");

        String path = file.toUri().getPath();
        SFTPv3FileHandle handle = client.openFileRO(path);
        SFTPInputStream is = new SFTPInputStream(handle, statistics);
        return is;
    }

    @Override
    public FSDataOutputStream create(Path file, FsPermission permission, boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress) throws IOException {
        return createInternal(file, permission, overwrite, SFTPv3Client.SSH_FXF_CREAT | SFTPv3Client.SSH_FXF_WRITE | SFTPv3Client.SSH_FXF_TRUNC);
    }

    @Override
    public FSDataOutputStream append(Path file, int bufferSize, Progressable progress) throws IOException {
        return createInternal(file, null, true, SFTPv3Client.SSH_FXF_WRITE | SFTPv3Client.SSH_FXF_APPEND);
    }

    protected FSDataOutputStream createInternal(Path file, FsPermission permission, boolean overwrite, int flags) throws IOException {
        if (exists(file) && !overwrite)
            throw new IOException("File " + file + " exists");

        Path parent = file.getParent();
        if (!exists(parent))
            mkdirs(parent);

        SFTPv3FileAttributes attrs = null;
        if (permission != null) {
            attrs = new SFTPv3FileAttributes();
            attrs.permissions = new Short(permission.toShort()).intValue();
        }

        String path = file.toUri().getPath();
        SFTPv3FileHandle handle = client.openFile(path, flags, attrs);
        SFTPOutputStream os = new SFTPOutputStream(handle, statistics);
        return new FSDataOutputStream(os, statistics);
    }

    @Override
    public boolean mkdirs(Path file, FsPermission permission) throws IOException {
        if (!exists(file)) {
            Path parent = file.getParent();
            if (parent == null || mkdirs(parent, permission)) {
                String path = file.toUri().getPath();
                client.mkdir(path, permission.toShort());
            }
        }
        return true;
    }

    @Override
    public boolean rename(Path src, Path dst) throws IOException {
        String oldPath = src.toUri().getPath();
        String newPath = dst.toUri().getPath();
        client.mv(oldPath, newPath);
        return true;
    }

    @Override
    public boolean delete(Path file, boolean recursive) throws IOException {
        String path = file.toUri().getPath();
        if (!getFileStatus(file).isDir()) {
            client.rm(path);
            return true;
        }

        FileStatus[] dirEntries = listStatus(file);
        if (dirEntries != null && dirEntries.length > 0 && !recursive)
            throw new IOException("Directory: " + file + " is not empty.");

        for (FileStatus dirEntry : dirEntries)
            delete(dirEntry.getPath(), recursive);

        client.rmdir(path);
        return true;
    }

    @Override
    public boolean delete(Path file) throws IOException {
        return delete(file, false);
    }

    @Override
    public void setTimes(Path file, long mtime, long atime) throws IOException {
        FileStatus status = getFileStatus(file);
        String path = status.getPath().toUri().getPath();
        SFTPv3FileAttributes attrs = client.stat(path);
        attrs.mtime = new Long(mtime / 1000L).intValue();
        attrs.atime = new Long(atime / 1000L).intValue();
        client.setstat(path, attrs);
    }

    @Override
    public FileStatus getFileStatus(Path file) throws IOException {
        if (file.getParent() == null)
            return new FileStatus(-1, true, -1, -1, -1, new Path("/").makeQualified(this));

        try {
            String path = file.toUri().getPath();
            SFTPv3FileAttributes attrs = client.stat(path);
            return getFileStatus(attrs, file);
        } catch (SFTPException e) {
            if (e.getServerErrorCode() == ErrorCodes.SSH_FX_NO_SUCH_FILE)
                throw new FileNotFoundException(file.toString());
            throw e;
        }
    }

    private FileStatus getFileStatus(SFTPv3FileAttributes attrs, Path file) throws IOException {
        long length = attrs.size;
        boolean isDir = attrs.isDirectory();
        long modTime = new Integer(attrs.mtime).longValue() * 1000L;
        long accessTime = new Integer(attrs.atime).longValue() * 1000L;
        FsPermission permission = new FsPermission(new Integer(attrs.permissions).shortValue());
        String user = Integer.toString(attrs.uid);
        String group = Integer.toString(attrs.gid);
        return new FileStatus(length, isDir, -1, -1, modTime, accessTime, permission, user, group, file);
    }

    @Override
    public FileStatus[] listStatus(Path path) throws IOException {
        FileStatus fileStat = getFileStatus(path);
        if (!fileStat.isDir())
            return new FileStatus[]{fileStat};

        List<SFTPv3DirectoryEntry> sftpFiles = client.ls(path.toUri().getPath());
        ArrayList<FileStatus> fileStats = new ArrayList<FileStatus>(sftpFiles.size());
        for (SFTPv3DirectoryEntry sftpFile : sftpFiles) {
            String filename = sftpFile.filename;
            if (!"..".equals(filename) && !".".equals(filename))
                fileStats.add(getFileStatus(sftpFile.attributes, new Path(path, filename).makeQualified(this)));
        }
        return fileStats.toArray(new FileStatus[0]);
    }

    @Override
    public boolean exists(Path file) {
        try {
            return getFileStatus(file) != null;
        } catch (IOException e) {
            return false;
        }
    }

    @Override
    public boolean isFile(Path file) throws IOException {
        return !getFileStatus(file).isDir();
    }

    @Override
    public URI getUri() {
        return uri;
    }

    @Override
    public void setWorkingDirectory(Path workDir) {
    }

    @Override
    public Path getWorkingDirectory() {
        return null;
    }
}

Вот как яЯ использую это (пожалуйста, см. prepareRun метод):



package com.guavus.plugin.source;

import co.cask.cdap.api.annotation.Description;
import co.cask.cdap.api.annotation.Name;
import co.cask.cdap.api.annotation.Plugin;
import co.cask.cdap.api.data.batch.Input;
import co.cask.cdap.api.data.format.StructuredRecord;
import co.cask.cdap.etl.api.PipelineConfigurer;
import co.cask.cdap.etl.api.batch.BatchSource;
import co.cask.cdap.etl.api.batch.BatchSourceContext;
import co.cask.hydrator.common.SourceInputFormatProvider;
import co.cask.hydrator.common.batch.JobUtils;
import co.cask.hydrator.format.FileFormat;
import co.cask.hydrator.format.input.CombinePathTrackingInputFormat;
import co.cask.hydrator.format.input.EmptyInputFormat;
import co.cask.hydrator.format.input.PathTrackingInputFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import com.guavus.plugin.sftp.SFTPFileSystem;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.URI;
import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Map;
import java.util.stream.Collectors;

import static com.guavus.plugin.sftp.SFTPFileSystem.*;


/**
 * {@link BatchSource} that reads from an SFTP server.
 */
@Plugin(type = BatchSource.PLUGIN_TYPE)
@Name(SFTP.NAME)
@Description("Batch source for an SFTP source.")
public class SFTP extends BatchSource<NullWritable, StructuredRecord, StructuredRecord> {
    //  public class SFTP extends FileSource {
    private final SFTPSourceConfig config;

    private final static String SFTP_IMPL_KEY = "fs.sftp.impl";
    private final static String SFTP_IMPL_CLASS = "com.guavus.plugin.sftp.SFTPFileSystem";

    private final static String SFTP_FILE_PREFIX = "sftp";

    public static final String NAME = "SFTP";

    private static Logger logger = LoggerFactory.getLogger(SFTP.class);

    public SFTP(SFTPSourceConfig config) {
        super();
        this.config = config;
    }

    @Override
    public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
        super.configurePipeline(pipelineConfigurer);
        pipelineConfigurer.getStageConfigurer().setOutputSchema(config.getSchema());
    }

    /**
     * Prepare the Batch run. Used to configure the job before starting the run.
     *
     * @param context batch execution context
     * @throws Exception if there's an error during this method invocation
     */
    @Override
    public void prepareRun(BatchSourceContext context) throws Exception {
        Job job = JobUtils.createInstance();

        logger.info("Setting up sftp configurations");
        // set entries here, before FileSystem is used
        Configuration conf = getSFTPConfigurations(job);


        logger.info("Creating SFTP connection");

        FileSystem sftpFs = new SFTPFileSystem();

        URI uri = getURI(config);
        config.setPath(uri.toString());

//        Path path = new Path(uri.getScheme(), uri.getAuthority(), uri.getPath());
        Path path = new Path(uri);

        try {
            sftpFs.initialize(uri, conf);
        } catch (IOException ex) {
            logger.error("Unable to initialize SFTP connection.");
            throw new RuntimeException("Unable to initialize SFTP connection.", ex);
        }
        logger.info("SFTP connection successful.");

        FileStatus[] fileStatus = sftpFs.globStatus(path);

        logger.info("Files needs to be read: {}", String.join(" | ", Arrays.stream(fileStatus).map(status -> status.getPath().toString()).collect(Collectors.toCollection(ArrayList::new))));

        String inputFormatClass;
        if (fileStatus == null) {
            if (config.shouldAllowEmptyInput()) {
                inputFormatClass = EmptyInputFormat.class.getName();
            } else {
                throw new IOException(String.format("Input path %s does not exist", path));
            }
        } else {
            logger.info("creating input format");
            FileInputFormat.addInputPath(job, path);
            FileInputFormat.setMaxInputSplitSize(job, config.getMaxSplitSize());
            logger.info("config.properties beofre: {}", config.getProperties().getProperties());

            Map<String, String> properties = config.getProperties().getProperties();
            logger.info("config.properties after: {}", properties.toString());
            PathTrackingInputFormat.configure(job, config, properties);
            FileFormat format = config.getFormat();
            if (format == FileFormat.BLOB) {
                inputFormatClass = PathTrackingInputFormat.class.getName();
            } else {
                inputFormatClass = CombinePathTrackingInputFormat.class.getName();
            }
        }

        // set entries here again, in case anything set by PathTrackingInputFormat should be overridden
        conf.set(FileInputFormat.SPLIT_MINSIZE, Long.toString(Long.MAX_VALUE));
        conf.set(SFTP_IMPL_KEY, SFTP_IMPL_CLASS);

//        conf.set("mapreduce.input.fileinputformat.inputdir", path.toString());

        logger.info("mapreduce url: {}", conf.get("mapreduce.input.fileinputformat.inputdir"));
        context.setInput(Input.of(config.getReferenceName(), new SourceInputFormatProvider(inputFormatClass, conf)));
    }


    /**
     * Creates a configuration object with all the information that is needed to make a SFTP connection.
     *
     * @param job
     * @return
     */
    private Configuration getSFTPConfigurations(Job job) {

        Configuration conf = job.getConfiguration();

        conf.set(SFTP_IMPL_KEY, SFTP_IMPL_CLASS);

        // Limit the number of splits to 1 since FTPInputStream does not support seek;
        conf.set(FileInputFormat.SPLIT_MINSIZE, Long.toString(Long.MAX_VALUE));

        conf.set(PARAM_HOST, config.getHost().trim());
        conf.set(PARAM_PORT, Integer.toString(config.getPort()));

        conf.set(PARAM_USER, config.getUsername().trim());
        conf.set(PARAM_PASSWORD, config.getPassword());
        return conf;
    }

    /**
     * Creates sftp URI by combining credentials and path provided.
     * It also escapes regex characters that are present in username or password.
     * This URI is used by Apache Hadoop internally to extract path and user information.
     *
     * @param config
     * @return
     */
    private URI getURI(SFTPSourceConfig config) {
        try {
            return new URI(SFTP_FILE_PREFIX, config.getUsername().trim() + ":" + config.getPassword().trim(), config.getHost().trim(), config.getPort(), config.getPath().trim(), null, null);
        } catch (Exception ex) {
            throw new RuntimeException("Unable to create SFTP URI", ex);
        }
    }
}

  • Мой код может успешно установить соединение SFTP.

  • Iпопытался передать userinfo, я кавычки вроде sftp://"root:asdf@123":192.168.192.201:22/home/nlsh/test/abc.csv, но не работает.

  • Я попытался передать закодированный пароль, используя URLEncoder.encode(password, "UTF-8"), но затем я не могу установить соединение.

Библиотеки: - Hadoop: 2.7.3 - spark: 2.3

...