Чтобы улучшить возможность повторного использования кода, я не хочу создавать объект GlueContext в основном методе и избегать добавления параметра glueContext к нескольким параметрам метода. Предыдущий код выглядит следующим образом:
import com.amazonaws.services.glue.GlueContext
import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession
object WorldCount {
def main(args: Array[String]): Unit = {
val glueContext = new GlueContext(new SparkContext())
val spark = glueContext.getSparkSession
func1(glueContext)
func2(spark)
}
def func1(glueContext: GlueContext): Unit = ???
def func2(spark: SparkSession): Unit = {
import spark.implicits._
// todo
}
}
- Я не хочу каждый раз писать код для создания объекта glueContext, а затем передавать объект glueContext в методе
Поэтому я создал черту GlueJob со следующим кодом:
import com.amazonaws.services.glue.util.{GlueArgParser, JsonOptions}
import com.amazonaws.services.glue.{DynamicFrame, GlueContext}
import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, SparkSession}
trait GlueJob {
lazy val glueContext: GlueContext = new GlueContext(new SparkContext())
lazy val spark : SparkSession = glueContext.getSparkSession
def argValues(args: Array[String], params: Array[String]): Array[String] = {
val m = GlueArgParser.getResolvedOptions(args, params)
params.map(m(_))
}
/**
* Get DynamicFrame from glue catalog
*/
def get(database: String, tableName: String,
redshiftTmpDir: Option[String] = None,
pushDownPredicate: Option[String] = None): DynamicFrame =
(redshiftTmpDir, pushDownPredicate) match {
case (Some(x), Some(y)) =>
glueContext.getCatalogSource(
database = database,
tableName = tableName,
redshiftTmpDir = x,
pushDownPredicate = y).getDynamicFrame()
case (Some(x), None) =>
glueContext.getCatalogSource(
database = database,
tableName = tableName,
redshiftTmpDir = x).getDynamicFrame()
case (None, Some(y)) =>
glueContext.getCatalogSource(
database = database,
tableName = tableName,
pushDownPredicate = y).getDynamicFrame()
case (None, None) =>
glueContext.getCatalogSource(
database = database,
tableName = tableName).getDynamicFrame()
}
def readJson(path: String): DynamicFrame =
read(List(path), "json")
private def read(paths: List[String], format: String): DynamicFrame =
glueContext.getSourceWithFormat(
connectionType = "s3",
options = JsonOptions(Map(
"paths" -> paths
)),
format = format
).getDynamicFrame()
def writeOrc(df: DataFrame, path: String,
partitionKeys: Seq[String] = Seq.empty): Unit =
write(df, path, "orc", partitionKeys)
def writeParquet(df: DataFrame, path: String,
partitionKeys: Seq[String] = Seq.empty): Unit =
write(df, path, "parquet", partitionKeys)
def writeJson(df: DataFrame, path: String,
partitionKeys: Seq[String] = Seq.empty): Unit =
write(df, path, "json", partitionKeys)
private def write(df: DataFrame, path: String,
format: String, partitionKeys: Seq[String]): Unit = {
val options = if (partitionKeys.isEmpty)
Map("path" -> path)
else
Map("path" -> path, "partitionKeys" -> partitionKeys)
glueContext.getSinkWithFormat(
connectionType = "s3",
options = JsonOptions(options),
format = format
).writeDynamicFrame(DynamicFrame(df, glueContext))
}
}
Таким образом, мне не нужно создавать объект glueContext в методе main, а также не нужно передавать объект glueContext или объект sparkSession и импортировать spark.implicits ._ несколько раз в каждом методе
object WorldCount extends GlueJob {
def main(args: Array[String]): Unit = {
func1()
func2()
}
def func1(): Unit = {
// glueContext.getCatalogSource()
}
def func2(): Unit = {
// import spark.implicits._
// todo
}
}
Но когда я тестировал, я столкнулся со странной проблемой. Ниже приведен код теста.
import java.text.SimpleDateFormat
import java.util.{Calendar, Date}
import com.starlight.data.glue.{GlueJob, S3Utils}
import org.apache.spark.sql.functions._
object Test extends GlueJob {
import spark.implicits._
val partitionKeys = "year" :: "month" :: "day" :: Nil
val params = Array("diff_days", "input_bucket_name", "input_path", "output_bucket_name", "output_path")
def main(args: Array[String]) {
val Array(diffDays, inputBucketName,
inputPath, outputBucketName,
outputPath) = this.argValues(args, params)
//Get data time
val getDataDate = (diff_days: Int) => {
val rightNow: Calendar = Calendar.getInstance()
rightNow.setTime(new Date())
rightNow.add(Calendar.DAY_OF_YEAR, diff_days * (-1))
val dateFormat: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd")
dateFormat.format(rightNow.getTime)
.split("-").toList
}
for (d <- 0 to diffDays.toInt) {
val year :: month :: day :: _ = getDataDate(d)
S3Utils.deleteFolder(outputBucketName, s"${outputPath}year=$year/month=$month/day=$day/")
//Select all files in the TXT directory for parsing
val inputFilePrefix = s"$inputPath$year/$month/$day/TXT"
val allInputFileName = S3Utils.listObjects(inputBucketName, inputFilePrefix)
if (allInputFileName.nonEmpty) {
val jsonDF = this.readJson(s"s3://$inputBucketName/$inputFilePrefix").toDF()
var dataDF = jsonDF.withColumn("dataList", explode($"data"))
dataDF.schema.fields.foreach(f => {
f.dataType.typeName.toLowerCase match {
case "null" | "array" => dataDF = dataDF.withColumn(f.name, dataDF(f.name).cast("string"))
case _ =>
}
})
this.writeOrc(dataDF, s"s3://$outputBucketName/$outputPath", partitionKeys)
}
}
}
}
журналы ошибок:
java.lang.IllegalStateException: User did not initialize spark context!
at org.apache.spark.deploy.yarn.ApplicationMaster.runDriver(ApplicationMaster.scala:485)
at org.apache.spark.deploy.yarn.ApplicationMaster.org$apache$spark$deploy$yarn$ApplicationMaster$$runImpl(ApplicationMaster.scala:305)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$run$1.apply$mcV$sp(ApplicationMaster.scala:245)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$run$1.apply(ApplicationMaster.scala:245)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$run$1.apply(ApplicationMaster.scala:245)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$3.run(ApplicationMaster.scala:779)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844)
at org.apache.spark.deploy.yarn.ApplicationMaster.doAsUser(ApplicationMaster.scala:778)
at org.apache.spark.deploy.yarn.ApplicationMaster.run(ApplicationMaster.scala:244)
at org.apache.spark.deploy.yarn.ApplicationMaster$.main(ApplicationMaster.scala:803)
at org.apache.spark.deploy.yarn.ApplicationMaster.main(ApplicationMaster.scala)
Continuous Logging: Shutting down cloudwatch appender.