Как выполнить пакетную запись таблицы, содержащей более 10 миллионов записей, в отдельные файлы gzip - Python / Postgres / Airflow - PullRequest
0 голосов
/ 13 февраля 2020

У меня есть таблица, в которой более 10 миллионов записей (строк). Я пытаюсь выполнить однократную загрузку в s3, выбрав * 'таблицу и затем записав ее в файл gzip в моей локальной файловой системе. В настоящее время я могу запустить свой сценарий для сбора 800 000 записей в файл gzip, но затем я получаю сообщение об ошибке, и остальные записи, очевидно, не вставляются.

Поскольку в sql нет продолжения (например, если вы запустите 10 лимитов 800 000 запросов, это не будет в порядке).

Итак, есть ли способ написать функцию python / airflow, которая может загружать более 10 миллионов таблиц? Возможно, есть способ python, где я могу сделать оператор select * и продолжить оператор после x количества записей в отдельных файлах gzip?

Вот мой скрипт на python / airflow настолько далеко, что при запуске он только 800 000 записей записывают в переменную пути:

def gzip_postgres_table(table_name, **kwargs):
    path = '/usr/local/airflow/{}.gz'.format(table_name)
    server_post = create_tunnel_postgres()
    server_post.start()
    etl_conn = conn_postgres_internal(server_postgres)

    record = get_etl_record(kwargs['master_table'],
                              kwargs['table_name'])
    cur = etl_conn.cursor()

    unload_sql = '''SELECT *
                        FROM schema1.database1.{0} '''.format(record['table_name'])

    cur.execute(unload_sql)
    result = cur.fetchall()
    column_names = [i[0] for i in cur.description]
    fp = gzip.open(path, 'wt')
    myFile = csv.writer(fp, delimiter=',')
    myFile.writerow(column_names)
    myFile.writerows(result)
    fp.close()
    etl_conn.close()
    server_postgres.stop()

1 Ответ

0 голосов
/ 13 февраля 2020

Лучший, я имею в виду ЛУЧШИЙ подход для вставки такого количества записей в PostgreSQL или для получения их из PostgreSQL, это использовать postgresql COPY. Это означает, что вам придется кардинально изменить свой подход, но лучшего способа, который я знаю в PostgreSQL, нет. Руководство по копированию

COPY создает файл с запросом, который вы выполняете, или его можно вставить в таблицу из файла.

COPY перемещает данные между PostgreSQL таблицами и стандартными файлами файловой системы.

Причина, по которой это лучшее решение, заключается в том, что вы используете PostgreSQL метод по умолчанию для обработки внешних данных без посредников; так что это быстро и безопасно .

COPY работает как чудо с файлами CSV. Вам следует изменить подход к методу обработки файлов и использовать COPY.

Поскольку COPY работает с SQL, вы можете разделить ваши данные, используя LIMIT и OFFSET в запросе , Например:

COPY (SELECT * FROM country LIMIT 10 OFFSET 10) TO '/usr1/proj/bray/sql/a_list_of_10_countries.copy';
-- This creates 10 countries starting in the row 10

COPY работает только с файлами, которые доступны на сервере пользователю PostgreSQL.

Функция PL (отредактировано):

Если вы хотите, чтобы COPY был динамическим c, вы можете использовать COPY в функции PL. Например:

CREATE OR REPLACE FUNCTION copy_table(
    table_name text,
    file_name text,
    vlimit text,
    voffset text
)RETURNS VOID AS $$
DECLARE
    query text;
BEGIN
    query := 'COPY (SELECT * FROM country LIMIT '||vlimit||' OFFSET '||voffset||') TO '''||file_name||''' DELIMITER '','' CSV';
-- NOTE that file_name has to have its dir too.
    EXECUTE query;
END;$$ LANGUAGE plpgsql;
SECURITY DEFINER
LANGUAGE plpgsql;

Чтобы выполнить функцию, вам просто нужно сделать:

SELECT copy_table('test','/usr/sql/test.csv','10','10')

Примечания:

  • Если PL будет опубликован c, вы должны проверить на SQL инъекционные атаки.
  • Вы можете запрограммировать PL в соответствии с вашими потребностями, это только пример.
  • Функция возвращает VOID, поэтому она просто делает COPY, если вам нужна обратная связь, вы должны вернуть что-то еще.
  • Функция должна принадлежать пользователю postgres с сервера, потому что она требует доступа к файлу; именно поэтому ему нужен SEFURITY DEFINER, чтобы любой пользователь базы данных мог запустить PL.
...