Я заметил, что spark-sftp библиотека (1.1.3
) имеет несколько зависимостей.На них sftp-client (1.0.3
) библиотека.Библиотека spark-sftp
использует некоторый метод библиотеки sftp-client
, который был продублирован.Вот мой код, который работает.
def runJob(): Unit ={
try {
val spark: SparkSession = initializeSpark()
import spark.sqlContext.implicits._
// Create DataFrame.
val df: DataFrame = Seq(("Alex", "2018-01-01 00:00:00", "2018-02-01 00:00:00", "OUT"), ("Bob", "2018-02-01 00:00:00", "2018-02-05 00:00:00", "IN"), ("Mark", "2018-02-01 00:00:00", "2018-03-01 00:00:00", "IN"), ("Mark", "2018-05-01 00:00:00", "2018-08-01 00:00:00", "OUT"), ("Meggy", "2018-02-01 00:00:00", "2018-02-01 00:00:00", "OUT")).toDF("NAME", "START_DATE", "END_DATE", "STATUS")
df.show()
// Create the object based on class "SFTPClient".
val sftpClient = new SFTPClient(null, "username", "password", "host", 22)
val tmpFolder = System.getProperty("java.io.tmpdir")
val hdfsTemp = tmpFolder
val source = writeToTemp(spark, df, hdfsTemp, tmpFolder, "csv", "true", ";", "rowTag", "rootTag")
println("source: " + source)
// Copy file to FTP server.
sftpClient.copyToFTP(source, "/reports/example.csv")
} catch {
case e: Exception => e.printStackTrace()
}
}
def writeToTemp(sparkSession: SparkSession, df: DataFrame, hdfsTemp: String, tempFolder: String, fileType: String, header: String, delimiter: String, rowTag: String, rootTag: String) : String = {
val randomSuffix = "spark_sftp_connection_temp_" + UUID.randomUUID
val hdfsTempLocation = hdfsTemp + File.separator + randomSuffix
val localTempLocation = tempFolder + File.separator + randomSuffix
println("hdfsTempLocation: " + hdfsTempLocation)
println("localTempLocation: " + localTempLocation)
addShutdownHook(localTempLocation)
df.coalesce(1).write.option("header", header).option("delimiter", delimiter).csv(hdfsTempLocation)
copyFromHdfs(sparkSession, hdfsTempLocation, localTempLocation)
println(copyFromHdfs(sparkSession, hdfsTempLocation, localTempLocation))
copiedFile(localTempLocation)
}
def addShutdownHook(tempLocation: String) {
println("Adding hook for file " + tempLocation)
val hook = new DeleteTempFileShutdownHook(tempLocation)
Runtime.getRuntime.addShutdownHook(hook)
}
def copyFromHdfs(sparkSession: SparkSession, hdfsTemp : String, fileLocation : String): String = {
val hadoopConf = sparkSession.sparkContext.hadoopConfiguration
val hdfsPath = new Path(hdfsTemp)
val fs = hdfsPath.getFileSystem(hadoopConf)
if ("hdfs".equalsIgnoreCase(fs.getScheme)) {
fs.copyToLocalFile(new Path(hdfsTemp), new Path(fileLocation))
fs.deleteOnExit(new Path(hdfsTemp))
fileLocation
} else {
hdfsTemp
}
}
def copiedFile(tempFileLocation: String) : String = {
val baseTemp = new File(tempFileLocation)
val files = baseTemp.listFiles().filter { x =>
!x.isDirectory && !x.getName.contains("SUCCESS") && !x.isHidden && !x.getName.contains(".crc")
}
files(0).getAbsolutePath
}
Я удалил информацию о параметре codec
, потому что в финальном файле csv возникли проблемы с charset.