Вы можете прочитать файл с S3 в режиме PERMISSIVE
. В этом режиме Spark создаст дополнительный столбец _corrupt_record
, который будет содержать информацию о найденной проблеме для конкретной строки. Затем вы можете фильтровать по этому столбцу, чтобы разделить фрейм данных как действительные и недействительные данные.
data = """{"a": 1, "b":2, "c":3}|{"a": 1, "b":2, "c":3}|{"a": 1, "b, "c":10}""".split('|')
# Valid data
validDF = (spark.read
.option("mode", "PERMISSIVE")
.option("columnNameOfCorruptRecord", "_corrupt_record")
.json(sc.parallelize(data))
.filter(col("_corrupt_record").isNull())
)
display(validDF)
# Invalid data
invalidDF = (spark.read
.option("mode", "PERMISSIVE")
.option("columnNameOfCorruptRecord", "_corrupt_record")
.json(sc.parallelize(data))
.filter(col("_corrupt_record").isNotNull())
)
display(invalidDF)
Если вы хотите сохранить DataFrames в таблицу Redshift, вы можете сделать следующее:
preactions = "TRUNCATE schema.table_name"
# Load the data into Redshift
validDF.write\
.format("com.databricks.spark.redshift")\
.option("url", db_redshift_url)\
.option("user", user)\
.option("password", password)\
.option("dbtable", "schema.table_name")\
.option("aws_iam_role", redshift_copy_role)\
.option("tempdir", args["TempDir"])\
.option("preactions", preactions)\
.mode("append")\
.save()
Приведенный выше код должен записывать DataFrame в Redshift, и вы можете использовать его в задании AWS Glue Spark. Нет необходимости использовать psycopg2.