Я создал базу данных под названием «glue-demo-db» и создал каталог для таблиц «заказов». Теперь я планирую написать свой собственный Scala скрипт для выполнения ETL. Работа проста: считайте данные из таблицы glue-demo-db.orders и запишите их как CSV в корзину S3. По какой-то причине: 1) диаграмма не может быть сгенерирована для Scala и 2) задание продолжает выполняться непрерывно без завершения или создания выходного файла. Любая помощь будет принята с благодарностью.
Код:
import com.amazonaws.services.glue.GlueContext
import com.amazonaws.services.glue.util.GlueArgParser
import com.amazonaws.services.glue.util.Job
import java.util.Calendar
import org.apache.spark.SparkContext
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.Row
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.from_json
import org.apache.spark.sql.streaming.Trigger
import scala.collection.JavaConverters._
object ordersDataCapture {
def main(sysArgs: Array[String]) {
val spark: SparkContext = new SparkContext()
val glueContext: GlueContext = new GlueContext(spark)
val sparkSession: SparkSession = glueContext.getSparkSession
// @params: [JOB_NAME]
val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray)
Job.init(args("JOB_NAME"), glueContext, args.asJava)
// @type: DataSource
// @args: [database = "glue-demo-db", table_name = "orders", redshift_tmp_dir = args["TempDir"], transformation_ctx = "<transformation_ctx>"]
// @return: <output>
// @inputs: []
val read_data = glueContext.getCatalogSource(database = "glue-demo-db", tableName = "orders", redshiftTmpDir = args("TempDir"), transformationContext = <transformation_ctx>).getDynamicFrame()
#Convert dynamic frame to data frame to use standard pyspark functions
val sourceDF = read_data.toDF()
sourceDF.printSchema()
sourceDF.show(false)
sourceDF.write // joined.write returns type DataFrameWriter
.mode(SaveMode.Append)
.format("csv")
.save("s3://nvs-ordersdata/output")
Job.commit()
}
}