AWS Интеграция Glue ETL с Spark и Scala - PullRequest
0 голосов
/ 04 мая 2020

Я создал базу данных под названием «glue-demo-db» и создал каталог для таблиц «заказов». Теперь я планирую написать свой собственный Scala скрипт для выполнения ETL. Работа проста: считайте данные из таблицы glue-demo-db.orders и запишите их как CSV в корзину S3. По какой-то причине: 1) диаграмма не может быть сгенерирована для Scala и 2) задание продолжает выполняться непрерывно без завершения или создания выходного файла. Любая помощь будет принята с благодарностью.

Код:

import com.amazonaws.services.glue.GlueContext
import com.amazonaws.services.glue.util.GlueArgParser
import com.amazonaws.services.glue.util.Job
import java.util.Calendar
import org.apache.spark.SparkContext
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.Row
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.from_json
import org.apache.spark.sql.streaming.Trigger
import scala.collection.JavaConverters._

object ordersDataCapture {
  def main(sysArgs: Array[String]) {
    val spark: SparkContext = new SparkContext()
    val glueContext: GlueContext = new GlueContext(spark)
    val sparkSession: SparkSession = glueContext.getSparkSession

    // @params: [JOB_NAME]
    val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray)
    Job.init(args("JOB_NAME"), glueContext, args.asJava)

    // @type: DataSource
    // @args: [database = "glue-demo-db", table_name = "orders", redshift_tmp_dir = args["TempDir"], transformation_ctx = "<transformation_ctx>"]
    // @return: <output>
    // @inputs: []
    val read_data = glueContext.getCatalogSource(database = "glue-demo-db", tableName = "orders", redshiftTmpDir = args("TempDir"), transformationContext = <transformation_ctx>).getDynamicFrame()

    #Convert dynamic frame to data frame to use standard pyspark functions
    val sourceDF = read_data.toDF()
    sourceDF.printSchema()
    sourceDF.show(false)

    sourceDF.write                           // joined.write returns type DataFrameWriter
          .mode(SaveMode.Append)
          .format("csv")
          .save("s3://nvs-ordersdata/output")

    Job.commit()
  }
}
...