У меня есть файл myfile.txt.gz
в ведре в S3.Я использую работу клея для преобразования этого файла в паркет.Код показан ниже.
import com.amazonaws.services.glue.ChoiceOption
import com.amazonaws.services.glue.GlueContext
import com.amazonaws.services.glue.MappingSpec
import com.amazonaws.services.glue.ResolveSpec
import com.amazonaws.services.glue.errors.CallSite
import com.amazonaws.services.glue.util.GlueArgParser
import com.amazonaws.services.glue.util.Job
import com.amazonaws.services.glue.util.JsonOptions
import org.apache.spark.SparkContext
import scala.collection.JavaConverters._
import com.amazonaws.services.glue.DynamicFrame
import org.apache.hadoop.fs._;
import org.apache.spark.sql.functions._
object GlueApp {
def main(sysArgs: Array[String]) {
val spark: SparkContext = new SparkContext()
val glueContext: GlueContext = new GlueContext(spark)
val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray)
Job.init(args("JOB_NAME"), glueContext, args.asJava)
val truncColUdf = udf((str: String) => if (str.length > 29999) str.substring(0, 29999) else str)
//myfile
val datasource21 = glueContext.getCatalogSource(database = "data", tableName = "myfile", redshiftTmpDir = "", transformationContext = "datasource21").getDynamicFrame()
val revDF21_1 = datasource21.toDF().withColumn("body", truncColUdf(col("body")))
val truncDynamicFrame21_1 = DynamicFrame(revDF21_1, glueContext)
val applymapping21 = truncDynamicFrame21_1.applyMapping(mappings = Seq(("id", "bigint","id", "bigint"),
("body", "varchar(65535)", "body", "varchar(65535)")), caseSensitive = false, transformationContext = "applymapping21")
val resolvechoice21 = applymapping21.resolveChoice(choiceOption = Some(ChoiceOption("make_struct")), transformationContext = "resolvechoice21")
val dropnullfields21 = resolvechoice21.dropNulls(transformationContext = "dropnullfields21")
val datasink21 = glueContext.getSinkWithFormat(connectionType = "s3", options = JsonOptions("""{"path": "s3://mypath/myfilefolder"}"""), transformationContext = "datasink21", format = "parquet").writeDynamicFrame(dropnullfields21)
Job.commit()
}
}
Когда я запускаю это задание, я получаю сообщение об ошибке:
java.util.concurrent.RejectedExecutionException ThreadPoolExecutor уже завершил работу
Однако, когда я запускаю ту же самую работу, но с тем же файлом myfile.txt
, но без архивации, она работает как задумано.Я также попытался запустить без линий усечения и получил ту же ошибку.Мне интересно, что означает эта ошибка и почему я получаю ее.
РЕДАКТИРОВАТЬ: После дополнительного тестирования, я думаю, что ошибка вызвана усечением, потому что все файлы, которые не требуют усечения работы.Единственное, о чем я могу подумать, это может вызвать проблемы, поскольку все файлы упакованы в архив, поэтому он не может прочитать столбцы для выполнения функции усечения.Может ли кто-нибудь подтвердить это и, возможно, предложить обходные пути?Заранее спасибо.