Я создаю плагин пакетного источника SFTP на основе this или, вкратце, вы можете сказать, что я подключаюсь к серверу SFTP и получаю эту проблему:
java.lang.IllegalArgumentException: Does not contain a valid host:port authority: gvs2:asdf12#@192.168.134.222:22
at org.apache.hadoop.net.NetUtils.createSocketAddr(NetUtils.java:213) ~[hadoop-common-2.7.3.2.6.5.0-292.jar:na]
at org.apache.hadoop.net.NetUtils.createSocketAddr(NetUtils.java:164) ~[hadoop-common-2.7.3.2.6.5.0-292.jar:na]
at org.apache.hadoop.security.SecurityUtil.buildDTServiceName(SecurityUtil.java:322) ~[hadoop-common-2.7.3.2.6.5.0-292.jar:na]
at org.apache.hadoop.fs.FileSystem.getCanonicalServiceName(FileSystem.java:322) ~[hadoop-common-2.7.3.2.6.5.0-292.jar:na]
at org.apache.hadoop.fs.FileSystem.collectDelegationTokens(FileSystem.java:543) ~[hadoop-common-2.7.3.2.6.5.0-292.jar:na]
at org.apache.hadoop.fs.FileSystem.addDelegationTokens(FileSystem.java:527) ~[hadoop-common-2.7.3.2.6.5.0-292.jar:na]
at org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodesInternal(TokenCache.java:140) ~[hadoop-mapreduce-client-core-2.7.3.2.6.5.0-292.jar:na]
at org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodesInternal(TokenCache.java:100) ~[hadoop-mapreduce-client-core-2.7.3.2.6.5.0-292.jar:na]
at org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodes(TokenCache.java:80) ~[hadoop-mapreduce-client-core-2.7.3.2.6.5.0-292.jar:na]
at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:243) ~[hadoop-mapreduce-client-core-2.7.3.2.6.5.0-292.jar:na]
at org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.getSplits(CombineFileInputFormat.java:216) ~[hadoop-mapreduce-client-core-2.7.3.2.6.5.0-292.jar:na]
at co.cask.hydrator.format.input.CombinePathTrackingInputFormat.getSplits(CombinePathTrackingInputFormat.java:61) ~[format-common-2.1.1-SNAPSHOT.jar:na]
at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:127) ~[spark-core_2.11-2.3.0.2.6.5.0-292.jar:2.3.0.2.6.5.0-292]
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253) ~[spark-core_2.11-2.3.0.2.6.5.0-292.jar:2.3.0.2.6.5.0-292]
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251) ~[spark-core_2.11-2.3.0.2.6.5.0-292.jar:2.3.0.2.6.5.0-292]
at scala.Option.getOrElse(Option.scala:121) ~[scala-library-2.11.8.jar:na]
Я получаю эту ошибку при использовании учетных данных: gvs2:asdf12#
. С другой стороны, если я использую пароль, который не имеет специальных символов, например, gvs1:asdf123
, тогда он прекрасно работает.
Я делаю это соединение SFTP, используя apache hadoop @ 2.7.3 библиотека, которая не поддерживает SFTP по умолчанию, поэтому я использовал другую упомянутую утилиту здесь .
Вот моя SFTPFileSystem
реализация.
package com.guavus.plugin.sftp;
import ch.ethz.ssh2.*;
import ch.ethz.ssh2.sftp.ErrorCodes;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.Progressable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
/**
* Based on implementation posted on Hadoop JIRA.
* Using Ganymede SSH lib for improved performance.
*
* @author wnagele
* @see https://issues.apache.org/jira/browse/HADOOP-5732
*/
public class SFTPFileSystem extends FileSystem {
private static Logger logger = LoggerFactory.getLogger(SFTPFileSystem.class);
private final int MAX_BUFFER_SIZE = 32768;
private final int DEFAULT_PORT = 22;
private final String DEFAULT_KEY_FILE = "${user.home}/.ssh/id_rsa";
private final String DEFAULT_KNOWNHOSTS_FILE = "${user.home}/.ssh/known_hosts";
private final String PARAM_BUFFER_SIZE = "io.file.buffer.size";
private final String PARAM_KEY_FILE = "fs.sftp.key.file";
private final String PARAM_KEY_PASSWORD = "fs.sftp.key.password";
private final String PARAM_KNOWNHOSTS = "fs.sftp.knownhosts";
public static final String PARAM_HOST = "fs.sftp.host";
public static final String PARAM_PORT = "fs.sftp.port";
public static final String PARAM_USER = "fs.sftp.user";
public static final String PARAM_PASSWORD = "fs.sftp.password";
private Configuration conf;
private URI uri;
private SFTPv3Client client;
private Connection connection;
// This is older implementation. Keeping it here for future reference.
@Override
public void initialize(URI uri, Configuration conf) throws IOException {
// Logger.getLogger("ch.ethz.ssh2").setLevel(Level.OFF);
this.uri = uri;
this.conf = conf;
// If no explicit buffer was set use the maximum.
// Also limit the buffer to the maximum value.
int bufferSize = conf.getInt(PARAM_BUFFER_SIZE, -1);
if (bufferSize > MAX_BUFFER_SIZE || bufferSize == -1)
conf.setInt(PARAM_BUFFER_SIZE, MAX_BUFFER_SIZE);
setConf(conf);
try {
connect();
} catch (IOException e) {
// Ensure to close down connections if we fail during initialization
close();
throw e;
}
}
/**
* The current implementation of @see KnownHosts does not support
* known_hosts entries that use a non-default port.
* If we encounter such an entry we wrap it into the known_hosts
* format before looking it up.
*/
private class PortAwareKnownHosts extends KnownHosts {
public PortAwareKnownHosts(File knownHosts) throws IOException {
super(knownHosts);
}
public int verifyHostkey(String hostname, int port, String serverHostKeyAlgorithm, byte[] serverHostKey) throws IOException {
if (port != 22) {
StringBuffer sb = new StringBuffer();
sb.append('[');
sb.append(hostname);
sb.append("]:");
sb.append(port);
hostname = sb.toString();
}
return super.verifyHostkey(hostname, serverHostKeyAlgorithm, serverHostKey);
}
}
protected void connect() throws IOException {
if (client == null || !client.isConnected()) {
String host = conf.get(PARAM_HOST);
int port = conf.getInt(PARAM_PORT, DEFAULT_PORT);
String key = conf.get(PARAM_KEY_FILE, DEFAULT_KEY_FILE);
String keyPassword = conf.get(PARAM_KEY_PASSWORD);
String user = conf.get(PARAM_USER);
final String password = conf.get(PARAM_PASSWORD);
String knownHostsFile = conf.get(PARAM_KNOWNHOSTS, null);
logger.info("know hosts file: {}", knownHostsFile);
// final PortAwareKnownHosts knownHosts = new PortAwareKnownHosts(new File(knownHostsFile));
connection = new Connection(host, port);
connection.connect(new ServerHostKeyVerifier() {
@Override
public boolean verifyServerHostKey(String hostname, int port, String serverHostKeyAlgorithm, byte[] serverHostKey) throws Exception {
// if (knownHosts.verifyHostkey(hostname, port, serverHostKeyAlgorithm, serverHostKey) == KnownHosts.HOSTKEY_IS_OK)
return true;
// throw new IOException("Couldn't verify host key for " + hostname);
}
});
if (password != null) {
if (connection.isAuthMethodAvailable(user, "password")) {
connection.authenticateWithPassword(user, password);
} else if (connection.isAuthMethodAvailable(user, "keyboard-interactive")) {
connection.authenticateWithKeyboardInteractive(user, new InteractiveCallback() {
@Override
public String[] replyToChallenge(String name, String instruction, int numPrompts, String[] prompt, boolean[] echo) throws Exception {
switch (prompt.length) {
case 0:
return new String[0];
case 1:
return new String[]{password};
}
throw new IOException("Cannot proceed with keyboard-interactive authentication. Server requested " + prompt.length + " challenges, we only support 1.");
}
});
} else {
throw new IOException("Server does not support any of our supported password authentication methods");
}
} else {
connection.authenticateWithPublicKey(user, new File(key), keyPassword);
}
client = new SFTPv3Client(connection);
}
}
@Override
public void close() throws IOException {
super.close();
if (client != null && client.isConnected())
client.close();
if (connection != null)
connection.close();
}
@Override
public FSDataInputStream open(Path file) throws IOException {
SFTPInputStream is = openInternal(file);
return new FSDataInputStream(is);
}
@Override
public FSDataInputStream open(Path file, int bufferSize) throws IOException {
SFTPInputStream is = openInternal(file);
return new FSDataInputStream(new BufferedFSInputStream(is, bufferSize));
}
private SFTPInputStream openInternal(Path file) throws IOException {
if (getFileStatus(file).isDir())
throw new IOException("Path " + file + " is a directory.");
String path = file.toUri().getPath();
SFTPv3FileHandle handle = client.openFileRO(path);
SFTPInputStream is = new SFTPInputStream(handle, statistics);
return is;
}
@Override
public FSDataOutputStream create(Path file, FsPermission permission, boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress) throws IOException {
return createInternal(file, permission, overwrite, SFTPv3Client.SSH_FXF_CREAT | SFTPv3Client.SSH_FXF_WRITE | SFTPv3Client.SSH_FXF_TRUNC);
}
@Override
public FSDataOutputStream append(Path file, int bufferSize, Progressable progress) throws IOException {
return createInternal(file, null, true, SFTPv3Client.SSH_FXF_WRITE | SFTPv3Client.SSH_FXF_APPEND);
}
protected FSDataOutputStream createInternal(Path file, FsPermission permission, boolean overwrite, int flags) throws IOException {
if (exists(file) && !overwrite)
throw new IOException("File " + file + " exists");
Path parent = file.getParent();
if (!exists(parent))
mkdirs(parent);
SFTPv3FileAttributes attrs = null;
if (permission != null) {
attrs = new SFTPv3FileAttributes();
attrs.permissions = new Short(permission.toShort()).intValue();
}
String path = file.toUri().getPath();
SFTPv3FileHandle handle = client.openFile(path, flags, attrs);
SFTPOutputStream os = new SFTPOutputStream(handle, statistics);
return new FSDataOutputStream(os, statistics);
}
@Override
public boolean mkdirs(Path file, FsPermission permission) throws IOException {
if (!exists(file)) {
Path parent = file.getParent();
if (parent == null || mkdirs(parent, permission)) {
String path = file.toUri().getPath();
client.mkdir(path, permission.toShort());
}
}
return true;
}
@Override
public boolean rename(Path src, Path dst) throws IOException {
String oldPath = src.toUri().getPath();
String newPath = dst.toUri().getPath();
client.mv(oldPath, newPath);
return true;
}
@Override
public boolean delete(Path file, boolean recursive) throws IOException {
String path = file.toUri().getPath();
if (!getFileStatus(file).isDir()) {
client.rm(path);
return true;
}
FileStatus[] dirEntries = listStatus(file);
if (dirEntries != null && dirEntries.length > 0 && !recursive)
throw new IOException("Directory: " + file + " is not empty.");
for (FileStatus dirEntry : dirEntries)
delete(dirEntry.getPath(), recursive);
client.rmdir(path);
return true;
}
@Override
public boolean delete(Path file) throws IOException {
return delete(file, false);
}
@Override
public void setTimes(Path file, long mtime, long atime) throws IOException {
FileStatus status = getFileStatus(file);
String path = status.getPath().toUri().getPath();
SFTPv3FileAttributes attrs = client.stat(path);
attrs.mtime = new Long(mtime / 1000L).intValue();
attrs.atime = new Long(atime / 1000L).intValue();
client.setstat(path, attrs);
}
@Override
public FileStatus getFileStatus(Path file) throws IOException {
if (file.getParent() == null)
return new FileStatus(-1, true, -1, -1, -1, new Path("/").makeQualified(this));
try {
String path = file.toUri().getPath();
SFTPv3FileAttributes attrs = client.stat(path);
return getFileStatus(attrs, file);
} catch (SFTPException e) {
if (e.getServerErrorCode() == ErrorCodes.SSH_FX_NO_SUCH_FILE)
throw new FileNotFoundException(file.toString());
throw e;
}
}
private FileStatus getFileStatus(SFTPv3FileAttributes attrs, Path file) throws IOException {
long length = attrs.size;
boolean isDir = attrs.isDirectory();
long modTime = new Integer(attrs.mtime).longValue() * 1000L;
long accessTime = new Integer(attrs.atime).longValue() * 1000L;
FsPermission permission = new FsPermission(new Integer(attrs.permissions).shortValue());
String user = Integer.toString(attrs.uid);
String group = Integer.toString(attrs.gid);
return new FileStatus(length, isDir, -1, -1, modTime, accessTime, permission, user, group, file);
}
@Override
public FileStatus[] listStatus(Path path) throws IOException {
FileStatus fileStat = getFileStatus(path);
if (!fileStat.isDir())
return new FileStatus[]{fileStat};
List<SFTPv3DirectoryEntry> sftpFiles = client.ls(path.toUri().getPath());
ArrayList<FileStatus> fileStats = new ArrayList<FileStatus>(sftpFiles.size());
for (SFTPv3DirectoryEntry sftpFile : sftpFiles) {
String filename = sftpFile.filename;
if (!"..".equals(filename) && !".".equals(filename))
fileStats.add(getFileStatus(sftpFile.attributes, new Path(path, filename).makeQualified(this)));
}
return fileStats.toArray(new FileStatus[0]);
}
@Override
public boolean exists(Path file) {
try {
return getFileStatus(file) != null;
} catch (IOException e) {
return false;
}
}
@Override
public boolean isFile(Path file) throws IOException {
return !getFileStatus(file).isDir();
}
@Override
public URI getUri() {
return uri;
}
@Override
public void setWorkingDirectory(Path workDir) {
}
@Override
public Path getWorkingDirectory() {
return null;
}
}
Вот как яЯ использую это (пожалуйста, см. prepareRun
метод):
package com.guavus.plugin.source;
import co.cask.cdap.api.annotation.Description;
import co.cask.cdap.api.annotation.Name;
import co.cask.cdap.api.annotation.Plugin;
import co.cask.cdap.api.data.batch.Input;
import co.cask.cdap.api.data.format.StructuredRecord;
import co.cask.cdap.etl.api.PipelineConfigurer;
import co.cask.cdap.etl.api.batch.BatchSource;
import co.cask.cdap.etl.api.batch.BatchSourceContext;
import co.cask.hydrator.common.SourceInputFormatProvider;
import co.cask.hydrator.common.batch.JobUtils;
import co.cask.hydrator.format.FileFormat;
import co.cask.hydrator.format.input.CombinePathTrackingInputFormat;
import co.cask.hydrator.format.input.EmptyInputFormat;
import co.cask.hydrator.format.input.PathTrackingInputFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import com.guavus.plugin.sftp.SFTPFileSystem;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.URI;
import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Map;
import java.util.stream.Collectors;
import static com.guavus.plugin.sftp.SFTPFileSystem.*;
/**
* {@link BatchSource} that reads from an SFTP server.
*/
@Plugin(type = BatchSource.PLUGIN_TYPE)
@Name(SFTP.NAME)
@Description("Batch source for an SFTP source.")
public class SFTP extends BatchSource<NullWritable, StructuredRecord, StructuredRecord> {
// public class SFTP extends FileSource {
private final SFTPSourceConfig config;
private final static String SFTP_IMPL_KEY = "fs.sftp.impl";
private final static String SFTP_IMPL_CLASS = "com.guavus.plugin.sftp.SFTPFileSystem";
private final static String SFTP_FILE_PREFIX = "sftp";
public static final String NAME = "SFTP";
private static Logger logger = LoggerFactory.getLogger(SFTP.class);
public SFTP(SFTPSourceConfig config) {
super();
this.config = config;
}
@Override
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
super.configurePipeline(pipelineConfigurer);
pipelineConfigurer.getStageConfigurer().setOutputSchema(config.getSchema());
}
/**
* Prepare the Batch run. Used to configure the job before starting the run.
*
* @param context batch execution context
* @throws Exception if there's an error during this method invocation
*/
@Override
public void prepareRun(BatchSourceContext context) throws Exception {
Job job = JobUtils.createInstance();
logger.info("Setting up sftp configurations");
// set entries here, before FileSystem is used
Configuration conf = getSFTPConfigurations(job);
logger.info("Creating SFTP connection");
FileSystem sftpFs = new SFTPFileSystem();
URI uri = getURI(config);
config.setPath(uri.toString());
// Path path = new Path(uri.getScheme(), uri.getAuthority(), uri.getPath());
Path path = new Path(uri);
try {
sftpFs.initialize(uri, conf);
} catch (IOException ex) {
logger.error("Unable to initialize SFTP connection.");
throw new RuntimeException("Unable to initialize SFTP connection.", ex);
}
logger.info("SFTP connection successful.");
FileStatus[] fileStatus = sftpFs.globStatus(path);
logger.info("Files needs to be read: {}", String.join(" | ", Arrays.stream(fileStatus).map(status -> status.getPath().toString()).collect(Collectors.toCollection(ArrayList::new))));
String inputFormatClass;
if (fileStatus == null) {
if (config.shouldAllowEmptyInput()) {
inputFormatClass = EmptyInputFormat.class.getName();
} else {
throw new IOException(String.format("Input path %s does not exist", path));
}
} else {
logger.info("creating input format");
FileInputFormat.addInputPath(job, path);
FileInputFormat.setMaxInputSplitSize(job, config.getMaxSplitSize());
logger.info("config.properties beofre: {}", config.getProperties().getProperties());
Map<String, String> properties = config.getProperties().getProperties();
logger.info("config.properties after: {}", properties.toString());
PathTrackingInputFormat.configure(job, config, properties);
FileFormat format = config.getFormat();
if (format == FileFormat.BLOB) {
inputFormatClass = PathTrackingInputFormat.class.getName();
} else {
inputFormatClass = CombinePathTrackingInputFormat.class.getName();
}
}
// set entries here again, in case anything set by PathTrackingInputFormat should be overridden
conf.set(FileInputFormat.SPLIT_MINSIZE, Long.toString(Long.MAX_VALUE));
conf.set(SFTP_IMPL_KEY, SFTP_IMPL_CLASS);
// conf.set("mapreduce.input.fileinputformat.inputdir", path.toString());
logger.info("mapreduce url: {}", conf.get("mapreduce.input.fileinputformat.inputdir"));
context.setInput(Input.of(config.getReferenceName(), new SourceInputFormatProvider(inputFormatClass, conf)));
}
/**
* Creates a configuration object with all the information that is needed to make a SFTP connection.
*
* @param job
* @return
*/
private Configuration getSFTPConfigurations(Job job) {
Configuration conf = job.getConfiguration();
conf.set(SFTP_IMPL_KEY, SFTP_IMPL_CLASS);
// Limit the number of splits to 1 since FTPInputStream does not support seek;
conf.set(FileInputFormat.SPLIT_MINSIZE, Long.toString(Long.MAX_VALUE));
conf.set(PARAM_HOST, config.getHost().trim());
conf.set(PARAM_PORT, Integer.toString(config.getPort()));
conf.set(PARAM_USER, config.getUsername().trim());
conf.set(PARAM_PASSWORD, config.getPassword());
return conf;
}
/**
* Creates sftp URI by combining credentials and path provided.
* It also escapes regex characters that are present in username or password.
* This URI is used by Apache Hadoop internally to extract path and user information.
*
* @param config
* @return
*/
private URI getURI(SFTPSourceConfig config) {
try {
return new URI(SFTP_FILE_PREFIX, config.getUsername().trim() + ":" + config.getPassword().trim(), config.getHost().trim(), config.getPort(), config.getPath().trim(), null, null);
} catch (Exception ex) {
throw new RuntimeException("Unable to create SFTP URI", ex);
}
}
}
Мой код может успешно установить соединение SFTP.
Iпопытался передать userinfo, я кавычки вроде sftp://"root:asdf@123":192.168.192.201:22/home/nlsh/test/abc.csv
, но не работает.
Я попытался передать закодированный пароль, используя URLEncoder.encode(password, "UTF-8")
, но затем я не могу установить соединение.
Библиотеки: - Hadoop: 2.7.3
- spark: 2.3