piping postgres COPY в python с psycopg2 - PullRequest
8 голосов
/ 20 июля 2011

Я пишу скрипт для копирования некоторых данных между двумя компьютерами в одной сети с помощью psycopg2. Я заменяю какой-то старый, уродливый bash, который делает копию с

psql -c -h remote.host "COPY table TO STDOUT" | psql -c "COPY table FROM STDIN"

Это выглядит как самый простой и самый эффективный способ сделать копию. Это легко скопировать в python с помощью stringIO или временного файла, например:

buf = StringIO()

from_curs   = from_conn.cursor()
to_curs     = to_conn.cursor()

from_curs.copy_expert("COPY table TO STDOUT", buf)
buf.seek(0, os.SEEK_SET)
to_curs.copy_expert("COPY table FROM STDIN", buf)

... но это включает в себя сохранение всех данных на диск / в память.

Кто-нибудь нашел способ имитировать поведение канала Unix в такой копии? Кажется, я не могу найти объект unix-pipe, который не включает POpen - возможно, лучшее решение - это просто использовать POpen и подпроцесс.

Ответы [ 2 ]

13 голосов
/ 07 февраля 2012

Вам придется поместить один из ваших вызовов в отдельную ветку.Я только что понял, что вы можете использовать os.pipe () , что делает все остальное довольно простым:

#!/usr/bin/python
import psycopg2
import os
import threading

fromdb = psycopg2.connect("dbname=from_db")
todb = psycopg2.connect("dbname=to_db")

r_fd, w_fd = os.pipe()

def copy_from():
    cur = todb.cursor()
    cur.copy_from(os.fdopen(r_fd), 'table')
    cur.close()
    todb.commit()

to_thread = threading.Thread(target=copy_from)
to_thread.start()

cur = fromdb.cursor()
write_f = os.fdopen(w_fd, 'w')
cur.copy_to(write_f, 'table')
write_f.close()   # or deadlock...

to_thread.join()
0 голосов
/ 20 июля 2011

Для поддержки чтения и записи можно использовать деку, которую вы вложили в подкласс:

from collections import deque
from Exceptions import IndexError

class DequeBuffer(deque):
    def write(self, data):
        self.append(data)
    def read(self):
        try:
            return self.popleft()
        except IndexError:
            return ''

buf = DequeBuffer()

Если читатель работает намного быстрее, чем писатель, и таблица велика, deque все равно будетстановится большим, но это будет меньше, чем хранение всего этого.

Кроме того, я точно не знаю return '', когда deque пусто, это безопасно, а не повторять попытку, пока он не опустеет,но я предполагаю, что это так.Дайте мне знать, если это работает.

Не забывайте del buf, когда вы уверены, что копия сделана, особенно если сценарий не просто завершается в этот момент.

...