Вы можете использовать Spark вместе с интеграцией Salesforce для обработки объектов Salesforce или выполнения операций ETL с ними. Посмотрите на приведенный ниже код: -
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.DataFrame
import scala.collection.mutable.ListBuffer
import com.springml.spark.salesforce
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import com.sforce.async
import com.sforce.soap.partner.PartnerConnection;
import com.sforce.ws.ConnectionException;
import com.sforce.ws.ConnectorConfig;
import org.apache.log4j.Logger
import com.sforce.soap.partner.sobject.SObject
import com.springml.salesforce.wave.api.{ ForceAPI, WaveAPI }
import java.text.SimpleDateFormat
import java.util.List
import java.io.{ BufferedWriter, File, FileWriter }
import org.apache.http.impl.client.{ DefaultHttpClient, BasicResponseHandler }
import org.apache.http.client.methods.HttpGet
import com.typesafe.config.ConfigFactory
import java.util.Properties
import org.apache.hadoop.fs._
import org.postgresql.Driver
object SparkSalesForceReaderWriter {
def SparkSForceIntegration(Spark: SparkSession) = {
val sc = Spark.sparkContext
val fs = FileSystem.get(sc.hadoopConfiguration)
val userName = "username"
val password = "password"
val config = new ConnectorConfig()
config.setUsername(userName)
config.setPassword(password)
val authEndpoint = ("https://login.salesforce.com/services/Soap/u/46.00") // https:login.salesforce.com/services/Soap/u/46.00
config.setAuthEndpoint(authEndpoint)
config.setServiceEndpoint(authEndpoint);
val conn = new PartnerConnection(config);
var connection = conn.describeGlobal()
val sobjectResults = connection.getSobjects()
var res = conn.describeSObject(" salesforce object name")
var fields = res.getFields().toBuffer;
println("Has " + fields.length + " fields");
var strFields = ""
fields.foreach { x =>
if (x == null || x == "" || x == " ") {
strFields = x.getName;
} else {
strFields = strFields + "," + x.getName;
}
}
var qry = "select " + strFields.toString.replaceFirst(",", "") + " from Account"
println("Your query is ==========>>>> " + qry)
val salesForceDataReader = Spark.read.format("com.springml.spark.salesforce")
.option("username", userName)
.option("password", password)
.option("soql", qry)
.option("version", "46.0")
.load()
}
}
Как только данные успешно считаны в фрейм данных spark, это как записать в s3 в scala / spark2 на s3n.
spark.sparkContext.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "[access key]")
spark.sparkContext.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "[secret key]")
spark.sparkContext.hadoopConfiguration.set("fs.s3n.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
salesForceDataReader.write
.mode("overwrite")
.parquet("s3n://bucket/folder/parquet/myFile")
Это то, как это сделать с s3a, что является предпочтительным.
spark.sparkContext.hadoopConfiguration.set("fs.s3a.access.key", "[access key]")
spark.sparkContext.hadoopConfiguration.set("fs.s3a.secret.key", "[secret key]")
spark.sparkContext.hadoopConfiguration.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
salesForceDataReader.write
.mode("overwrite")
.parquet("s3a://bucket/folder/parquet/myFile")
При вышеуказанном подходе вы можете использовать скорость и производительность зажигания и выполнять свою работу ETL быстрее. Вы даже можете записать данные объекта salesforce в таблицы SQL, а также в файлы.