необходимо выполнить проверку файла в s3 и скопировать его в две разные таблицы - PullRequest
0 голосов
/ 06 июня 2019

Я хочу проверить файл в s3 и отправить все действительные и недействительные данные в две разные таблицы в красном смещении.Кто-нибудь может помочь с примером?

1 Ответ

0 голосов
/ 11 июня 2019

Вы можете прочитать файл с S3 в режиме PERMISSIVE. В этом режиме Spark создаст дополнительный столбец _corrupt_record, который будет содержать информацию о найденной проблеме для конкретной строки. Затем вы можете фильтровать по этому столбцу, чтобы разделить фрейм данных как действительные и недействительные данные.

data = """{"a": 1, "b":2, "c":3}|{"a": 1, "b":2, "c":3}|{"a": 1, "b, "c":10}""".split('|')

# Valid data 
validDF = (spark.read
  .option("mode", "PERMISSIVE")
  .option("columnNameOfCorruptRecord", "_corrupt_record")
  .json(sc.parallelize(data))
  .filter(col("_corrupt_record").isNull())
)

display(validDF)

# Invalid data 
invalidDF = (spark.read
  .option("mode", "PERMISSIVE")
  .option("columnNameOfCorruptRecord", "_corrupt_record")
  .json(sc.parallelize(data))
  .filter(col("_corrupt_record").isNotNull())
)

display(invalidDF)

Если вы хотите сохранить DataFrames в таблицу Redshift, вы можете сделать следующее:

preactions = "TRUNCATE schema.table_name"
# Load the data into Redshift    
validDF.write\
            .format("com.databricks.spark.redshift")\
            .option("url", db_redshift_url)\
            .option("user", user)\
            .option("password", password)\
            .option("dbtable", "schema.table_name")\
            .option("aws_iam_role", redshift_copy_role)\
            .option("tempdir", args["TempDir"])\
            .option("preactions", preactions)\
            .mode("append")\
            .save()

Приведенный выше код должен записывать DataFrame в Redshift, и вы можете использовать его в задании AWS Glue Spark. Нет необходимости использовать psycopg2.

...