AWS Клей для Redshift: дублирующиеся данные? - PullRequest
0 голосов
/ 19 сентября 2018

Вот несколько пунктов с точки зрения того, как у меня настроены вещи:

У меня есть файлы CSV, загруженные на S3, и настройка сканера Glue для создания таблицы и схемы.У меня есть настройка задания Glue, которая записывает данные из таблицы Glue в нашу базу данных Amazon Redshift с использованием соединения JDBC.Задание также отвечает за сопоставление столбцов и создание таблицы красного смещения.Повторно выполняя задание, я получаю повторяющиеся строки в красном смещении (как и ожидалось).

Однако есть ли способ заменить или удалить строки перед вставкой новых данных?

Функциональность BOOKMARK включена, но не работает.

Как подключиться к красному смещению, удалитьвсе данные как часть JOB перед отправкой данных в красное смещение в Python?

Ответы [ 4 ]

0 голосов
/ 15 ноября 2018

Если вы хотите выполнить полную загрузку, вы можете использовать библиотеку баз данных spark / Pyspark для перезаписи таблицы:

df.write\
  .format("com.databricks.spark.redshift")\
  .option("url", redshift_url)\
  .option("dbtable", redshift_table)\
  .option("user", user)\
  .option("password", readshift_password)\
  .option("aws_iam_role", redshift_copy_role)\
  .option("tempdir", args["TempDir"])\
  .mode("overwrite")\
  .save()

Per Databricks / Spark документация :

Перезапись существующей таблицы: по умолчанию эта библиотека использует транзакции для выполнения перезаписей, которые реализуются путем удаления таблицы назначения, создания новой пустой таблицы и добавления в нее строк.

Вы можете ознакомиться с документацией по базам данных в здесь

0 голосов
/ 20 сентября 2018

Вы можете использовать модуль Python pg8000 для подключения к Redfshift и выполнения SQL для удаления (удаления / усечения) данных из вашего скрипта Glue.pg8000 - это чистый python, поэтому он работает с Glue.

Проверьте эту ссылку: AWS Glue - Усечение таблицы postgres назначения перед вставкой

Я пробовал, и это работаетхорошо.Надеюсь, это поможет вам,

0 голосов
/ 20 сентября 2018

В настоящее время Glue не поддерживает создание закладок для источников JDBC.

Вы можете внедрить upsert / merge в Redshift в задании Glue, используя параметр postactions (код в Scala):

val fields = sourceDf.columns.mkString(",")

glueContext.getJDBCSink(
  catalogConnection = "RedshiftConnectionTest",
  options = JsonOptions(Map(
    "database" -> "conndb",
    "dbtable" -> "staging_schema.staging_table",
    "postactions" -> 
        s"""
           DELETE FROM dst_schema.dst_table USING staging_schema.staging_table AS S WHERE dst_table.id = S.id;
           INSERT INTO dst_schema.dst_table ($fields) SELECT $fields FROM staging_schema.staging_table;
           DROP TABLE IF EXISTS staging_schema.staging_table
        """
  )),
  redshiftTmpDir = tempDir,
  transformationContext = "redshift-output"
).writeDynamicFrame(DynamicFrame(sourceDf, glueContext))

Если вы просто хотите удалить существующую таблицу, вы можете использовать вместо нее параметр preactions:

glueContext.getJDBCSink(
  catalogConnection = "RedshiftConnectionTest",
  options = JsonOptions(Map(
    "database" -> "conndb",
    "dbtable" -> "dst_schema.dst_table",
    "preactions" -> "DELETE FROM dst_schema.dst_table"
  )),
  redshiftTmpDir = tempDir,
  transformationContext = "redshift-output"
).writeDynamicFrame(DynamicFrame(sourceDf, glueContext))
0 голосов
/ 19 сентября 2018

Пока у вас есть уникальный ключ в ваших таблицах, в идеале - целочисленный первичный ключ.Далее я расскажу об этом следующим образом:

  1. Реализация инструмента планирования, позволяющего запускать задания по порядку.Я рекомендую Airflow.
  2. Инициируйте задание Glue для чтения из источника и записи в промежуточную таблицу.(промежуточная таблица будет содержать только выходные данные этого прогона клея, не обязательно все строки)
  3. Дождаться завершения этого задания клея (с помощью инструмента планирования)
  4. Инициировать задание SQL, работающее в Redshiftчто:

a) удаляет совпадающие строки из целевой таблицы

delete from target
where id in (select id from staging);

b) Вставляет данные из промежуточной в целевую таблицу

insert into target select * from staging;

c) усекает промежуточный стол

г) вакуум и анализирует обе таблицы

vacuum target to 100 percent;
analyze target;
vacuum staging;
...