У меня есть файл S3 json размером 4 МБ. Я просканировал данные с помощью клея AWS, и он сгенерировал соответствующую таблицу каталога данных. Я создал задание (используя консоль ETL в AWS Glue для загрузки данных в mazon Redshift.
При одинаковом формате данных, но разном размере файла данные загружаются в базу данных надлежащим образом. Но когда он достигает 4 МБ, ошибка показывает
"Произошла ошибка при вызове 0146.pyWriteDynamicFrame. Ошибка (код 1204) при загрузке данных в REdshift." Длина строки превышает длину DDL "
Кто-нибудь может помочь мне решить проблему? Мой сценарий выглядит следующим образом. Это было сгенерировано с помощью клеевой консоли.
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
## @params: [TempDir, JOB_NAME]
args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "tga", table_name = "db", transformation_ctx = "datasource0")
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("attachments", "string", "attachments", "string"), ("classifications.Classification", "array", "`classifications.Classification`", "string"), ("code", "string", "code", "string"), ("completionmapping.NrtCompletion", "array", "`completionmapping.NrtCompletion`", "string"), ("componenttype", "string", "componenttype", "string"), ("contacts.Contact", "array", "`contacts.Contact`", "string"), ("createddate.DateTime", "string", "`createddate.DateTime`", "string"), ("createddate.OffsetMinutes", "string", "`createddate.OffsetMinutes`", "string"), ("currencyperiods.NrtCurrencyPeriod", "array", "`currencyperiods.NrtCurrencyPeriod`", "string"), ("currencystatus", "string", "currencystatus", "string"), ("datamanagers.DataManagerAssignment", "array", "`datamanagers.DataManagerAssignment`", "string"), ("islegacydata", "boolean", "islegacydata", "boolean"), ("isstreamlined", "boolean", "isstreamlined", "boolean"), ("mappinginformation.Mapping", "array", "`mappinginformation.Mapping`", "string"), ("recognitionmanagers.RecognitionManagerAssignment", "array", "`recognitionmanagers.RecognitionManagerAssignment`", "string"), ("restrictions", "string", "restrictions", "string"), ("reversemappinginformation.Mapping", "array", "`reversemappinginformation.Mapping`", "string"), ("title", "string", "title", "string"), ("updateddate.DateTime", "string", "`updateddate.DateTime`", "string"), ("updateddate.OffsetMinutes", "string", "`updateddate.OffsetMinutes`", "string"), ("_code", "string", "_code", "string"), ("_salesforceid", "string", "_salesforceid", "string"), ("_api", "string", "_api", "string"), ("_timestamp", "string", "_timestamp", "string"), ("parentcode", "string", "parentcode", "string"), ("parenttitle", "string", "parenttitle", "string"), ("releases.Release", "array", "`releases.Release`", "string"), ("usagerecommendations.UsageRecommendation", "array", "`usagerecommendations.UsageRecommendation`", "string")], transformation_ctx = "applymapping1")
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_cols", transformation_ctx = "resolvechoice2")
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")
datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = dropnullfields3, catalog_connection = "redshift", connection_options = {"dbtable": "trainingcomponentservicegetdetails", "database": "db"}, redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink4")
job.commit()
Scala Script ...
import com.amazonaws.services.glue.ChoiceOption
import com.amazonaws.services.glue.GlueContext
import com.amazonaws.services.glue.MappingSpec
import com.amazonaws.services.glue.ResolveSpec
import com.amazonaws.services.glue.errors.CallSite
import com.amazonaws.services.glue.util.GlueArgParser
import com.amazonaws.services.glue.util.Job
import com.amazonaws.services.glue.util.JsonOptions
import org.apache.spark.SparkContext
import org.apache.spark.sql.types.MetadataBuilder
import scala.collection.JavaConverters._
object GlueApp {
def main(sysArgs: Array[String]) {
val spark: SparkContext = new SparkContext()
val glueContext: GlueContext = new GlueContext(spark)
val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("TempDir","JOB_NAME").toArray)
Job.init(args("JOB_NAME"), glueContext, args.asJava)
val datasource0 = glueContext.getCatalogSource(database = "tga", tableName = "trainingcomponentservicegetdetails", redshiftTmpDir = "", transformationContext = "datasource0").getDynamicFrame()
val applymapping1 = datasource0.applyMapping(mappings = Seq(("attachments", "string", "attachments", "string"), ("classifications.Classification", "array", "`classifications.Classification`", "string"), ("code", "string", "code", "string"), ("completionmapping.NrtCompletion", "array", "`completionmapping.NrtCompletion`", "string"), ("componenttype", "string", "componenttype", "string"), ("contacts.Contact", "array", "`contacts.Contact`", "string"), ("createddate.DateTime", "string", "`createddate.DateTime`", "string"), ("createddate.OffsetMinutes", "string", "`createddate.OffsetMinutes`", "string"), ("currencyperiods.NrtCurrencyPeriod", "array", "`currencyperiods.NrtCurrencyPeriod`", "string"), ("currencystatus", "string", "currencystatus", "string"), ("datamanagers.DataManagerAssignment", "array", "`datamanagers.DataManagerAssignment`", "string"), ("islegacydata", "boolean", "islegacydata", "boolean"), ("isstreamlined", "boolean", "isstreamlined", "boolean"), ("mappinginformation.Mapping", "array", "`mappinginformation.Mapping`", "string"), ("recognitionmanagers.RecognitionManagerAssignment", "array", "`recognitionmanagers.RecognitionManagerAssignment`", "string"), ("restrictions", "string", "restrictions", "string"), ("reversemappinginformation.Mapping", "array", "`reversemappinginformation.Mapping`", "string"), ("title", "string", "title", "string"), ("updateddate.DateTime", "string", "`updateddate.DateTime`", "string"), ("updateddate.OffsetMinutes", "string", "`updateddate.OffsetMinutes`", "string"), ("_code", "string", "_code", "string"), ("_salesforceid", "string", "_salesforceid", "string"), ("_api", "string", "_api", "string"), ("_timestamp", "long", "_timestamp", "long"), ("industrysectors.TrainingComponentIndustrySector", "array", "`industrysectors.TrainingComponentIndustrySector`", "string"), ("occupations.TrainingComponentOccupation", "array", "`occupations.TrainingComponentOccupation`", "string"), ("parentcode", "string", "parentcode", "string"), ("parenttitle", "string", "parenttitle", "string"), ("releases.Release", "array", "`releases.Release`", "string"), ("usagerecommendations.UsageRecommendation", "array", "`usagerecommendations.UsageRecommendation`", "string"), ("tpdevelopercode", "string", "tpdevelopercode", "string")), caseSensitive = false, transformationContext = "applymapping1")
val resolvechoice2 = applymapping1.resolveChoice(choiceOption = Some(ChoiceOption("make_cols")), transformationContext = "resolvechoice2")
val dropnullfields3 = resolvechoice2.dropNulls(transformationContext = "dropnullfields3")
val datasink4 = glueContext.getJDBCSink(catalogConnection = "redshift", options = JsonOptions("""{"dbtable": "trainingcomponentservicegetdetails", "database": "dbanasightmla"}"""), redshiftTmpDir = args("TempDir"), transformationContext = "datasink4").writeDynamicFrame(dropnullfields3)
Job.commit()
}
}
Я нашел это, но не могу заставить его работать.
https://github.com/databricks/spark-redshift
val columnLengthMap = Map(
"attachments" ->4000,
"classifications.Classification" ->4000,
"code" ->4000,
"completionmapping.NrtCompletion" ->4000,
"componenttype" ->4000,
"contacts.Contact" ->4000,
"createddate.DateTime" ->4000,
"createddate.OffsetMinutes" ->4000,
"currencyperiods.NrtCurrencyPeriod" ->4000,
"currencystatus" ->4000,
"datamanagers.DataManagerAssignment" ->4000,
"`datamanagers.DataManagerAssignment`" ->4000,
"islegacydata" ->4000,
"isstreamlined" ->4000,
"mappinginformation.Mapping" ->4000,
"recognitionmanagers.RecognitionManagerAssignment" ->4000,
"restrictions" ->4000,
"reversemappinginformation.Mapping" ->4000,
"title" ->4000,
"updateddate.DateTime" ->4000,
"updateddate.OffsetMinutes" ->4000,
"_code" ->4000,
"_salesforceid" ->4000,
"_api" ->4000,
"_timestamp" ->4000,
"industrysectors.TrainingComponentIndustrySector" ->4000,
"occupations.TrainingComponentOccupation" ->4000,
"parentcode" ->4000,
"parenttitle" ->4000,
"releases.Release" ->4000,
"usagerecommendations.UsageRecommendation" ->4000,
"tpdevelopercode" ->4000)
val df: DataFrame = sqlContext.read
.format("com.databricks.spark.redshift")
.option("jdbc:redshift://anasight-redshift-mla.cf2ow8sevrix.ap-southeast-2.redshift.amazonaws.com:5439/dbanasightmla")
.option("trainingcomponentservicegetdetails", "trainingcomponentservicegetdetails")
.option("tempdir", "s3://redshift-anasight-2018/EMPLOYMENT")
.load()
columnLengthMap.foreach
{ case (colName, length) =>
val metadata = new MetadataBuilder().putLong("maxlength", length).build()
df = df.withColumn(colName, df(colName).as(colName, metadata))
}