Извлечение данных из Salesforce в Amazon S3 - PullRequest
0 голосов
/ 31 октября 2019

Мое требование: Мне нужно вытащить полные данные, т. Е. Все объекты / поля из Salesforce в мое Amazon S3 Bucket для дальнейшего анализа данных на исторических данных. Мы очень полагаемся на пути с открытым исходным кодом.

Что я пробовал?

  1. Через задание Amazon Glue: в настоящее время я выполнял это действие для некоторых объектов Salesforce, таких как Opportunity, Leads, Accounts, Contactsиспользуя AWS Glue, где я написал несколько заданий Glue в Scala для подключения к Salesforce, извлечения данных в Spark Dataframes и выгрузки их в корзину S3.

2. Используя эту ссылку, я имеюпопытался вытянуть объекты Salesforce в S3. Не обращайте внимания на действия, связанные с публикацией Amazon S3 в этом URL

3.Data Virtuality Pipe (сторонний соединитель). Это полезно при извлечении столько объектов, сколько нам нужно. Существует ограничение в отношении того, где находится целевой дамп данных.

4.Пробовал несколько других разъемов, таких как Heroku.

Задачи:

1.Подбор данных даже для нескольких объектов занимает больше времени

Нам нужно создать несколько склеенных заданий (около 100+ объектов Salesforce, которые мы собираемся получить)

3. Преобразование данных (т. Е. Преобразование типов данных, очистка) огромно, так какРабота склеивания не поддерживает целостность типов данных при извлечении данных из Salesforce.

4. Техническое обслуживание, необходимое для выполнения работ.

5. Через сторонние разъемы, огромные затраты.

1 Ответ

0 голосов
/ 01 ноября 2019

Вы можете использовать Spark вместе с интеграцией Salesforce для обработки объектов Salesforce или выполнения операций ETL с ними. Посмотрите на приведенный ниже код: -

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.DataFrame
import scala.collection.mutable.ListBuffer
import com.springml.spark.salesforce
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import com.sforce.async
import com.sforce.soap.partner.PartnerConnection;
import com.sforce.ws.ConnectionException;
import com.sforce.ws.ConnectorConfig;
import org.apache.log4j.Logger
import com.sforce.soap.partner.sobject.SObject
import com.springml.salesforce.wave.api.{ ForceAPI, WaveAPI }
import java.text.SimpleDateFormat
import java.util.List
import java.io.{ BufferedWriter, File, FileWriter }
import org.apache.http.impl.client.{ DefaultHttpClient, BasicResponseHandler }
import org.apache.http.client.methods.HttpGet
import com.typesafe.config.ConfigFactory
import java.util.Properties
import org.apache.hadoop.fs._
import org.postgresql.Driver

object SparkSalesForceReaderWriter {

  def SparkSForceIntegration(Spark: SparkSession) = {

    val sc = Spark.sparkContext
    val fs = FileSystem.get(sc.hadoopConfiguration)

    val userName = "username"
    val password = "password"

    val config = new ConnectorConfig()
    config.setUsername(userName)
    config.setPassword(password)
    val authEndpoint = ("https://login.salesforce.com/services/Soap/u/46.00") // https:login.salesforce.com/services/Soap/u/46.00
    config.setAuthEndpoint(authEndpoint)
    config.setServiceEndpoint(authEndpoint);
    val conn = new PartnerConnection(config);
    var connection = conn.describeGlobal()
    val sobjectResults = connection.getSobjects()
    var res = conn.describeSObject(" salesforce object name")
    var fields = res.getFields().toBuffer;
    println("Has " + fields.length + " fields");
    var strFields = ""

    fields.foreach { x =>
      if (x == null || x == "" || x == " ") {
        strFields = x.getName;
      } else {
        strFields = strFields + "," + x.getName;
      }
    }

    var qry = "select " + strFields.toString.replaceFirst(",", "") + " from Account"
    println("Your query is ==========>>>> " + qry)

    val salesForceDataReader = Spark.read.format("com.springml.spark.salesforce")
      .option("username", userName)
      .option("password", password)
      .option("soql", qry)
      .option("version", "46.0")
      .load()

  }

}

Как только данные успешно считаны в фрейм данных spark, это как записать в s3 в scala / spark2 на s3n.

spark.sparkContext.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "[access key]")
spark.sparkContext.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "[secret key]")
spark.sparkContext.hadoopConfiguration.set("fs.s3n.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")

salesForceDataReader.write
.mode("overwrite")
.parquet("s3n://bucket/folder/parquet/myFile")

Это то, как это сделать с s3a, что является предпочтительным.

spark.sparkContext.hadoopConfiguration.set("fs.s3a.access.key", "[access key]")
spark.sparkContext.hadoopConfiguration.set("fs.s3a.secret.key", "[secret key]")
spark.sparkContext.hadoopConfiguration.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")

salesForceDataReader.write
.mode("overwrite")
.parquet("s3a://bucket/folder/parquet/myFile")

При вышеуказанном подходе вы можете использовать скорость и производительность зажигания и выполнять свою работу ETL быстрее. Вы даже можете записать данные объекта salesforce в таблицы SQL, а также в файлы.

...