Использование AWS Glue для преобразования сжатых CSV-файлов в паркетные сбои - PullRequest
0 голосов
/ 24 августа 2018

У меня есть файл myfile.txt.gz в ведре в S3.Я использую работу клея для преобразования этого файла в паркет.Код показан ниже.

import com.amazonaws.services.glue.ChoiceOption
import com.amazonaws.services.glue.GlueContext
import com.amazonaws.services.glue.MappingSpec
import com.amazonaws.services.glue.ResolveSpec
import com.amazonaws.services.glue.errors.CallSite
import com.amazonaws.services.glue.util.GlueArgParser
import com.amazonaws.services.glue.util.Job
import com.amazonaws.services.glue.util.JsonOptions
import org.apache.spark.SparkContext
import scala.collection.JavaConverters._
import com.amazonaws.services.glue.DynamicFrame 
import org.apache.hadoop.fs._; 
import org.apache.spark.sql.functions._ 


object GlueApp {
  def main(sysArgs: Array[String]) {
    val spark: SparkContext = new SparkContext()
    val glueContext: GlueContext = new GlueContext(spark)
    val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray)
    Job.init(args("JOB_NAME"), glueContext, args.asJava)
    val truncColUdf = udf((str: String) => if (str.length > 29999) str.substring(0, 29999) else str)

    //myfile
    val datasource21 = glueContext.getCatalogSource(database = "data", tableName = "myfile", redshiftTmpDir = "", transformationContext = "datasource21").getDynamicFrame()
    val revDF21_1 = datasource21.toDF().withColumn("body", truncColUdf(col("body")))
    val truncDynamicFrame21_1 = DynamicFrame(revDF21_1, glueContext)
    val applymapping21 = truncDynamicFrame21_1.applyMapping(mappings = Seq(("id", "bigint","id", "bigint"),
                ("body", "varchar(65535)", "body", "varchar(65535)")), caseSensitive = false, transformationContext = "applymapping21")
    val resolvechoice21 = applymapping21.resolveChoice(choiceOption = Some(ChoiceOption("make_struct")), transformationContext = "resolvechoice21")
    val dropnullfields21 = resolvechoice21.dropNulls(transformationContext = "dropnullfields21")
    val datasink21 = glueContext.getSinkWithFormat(connectionType = "s3", options = JsonOptions("""{"path": "s3://mypath/myfilefolder"}"""), transformationContext = "datasink21", format = "parquet").writeDynamicFrame(dropnullfields21)

    Job.commit()
   }
}

Когда я запускаю это задание, я получаю сообщение об ошибке:

java.util.concurrent.RejectedExecutionException ThreadPoolExecutor уже завершил работу

Однако, когда я запускаю ту же самую работу, но с тем же файлом myfile.txt, но без архивации, она работает как задумано.Я также попытался запустить без линий усечения и получил ту же ошибку.Мне интересно, что означает эта ошибка и почему я получаю ее.

РЕДАКТИРОВАТЬ: После дополнительного тестирования, я думаю, что ошибка вызвана усечением, потому что все файлы, которые не требуют усечения работы.Единственное, о чем я могу подумать, это может вызвать проблемы, поскольку все файлы упакованы в архив, поэтому он не может прочитать столбцы для выполнения функции усечения.Может ли кто-нибудь подтвердить это и, возможно, предложить обходные пути?Заранее спасибо.

...