Как отследить точное положение обработанных строк, чтобы возобновить работу после проблемы «соединение уже закрыто» - PullRequest
0 голосов
/ 15 мая 2019

Я делаю передачу наборов данных между двумя базами данных на разных серверах Postgres. Процесс займет пару недель из-за больших записей в исходной базе данных. Однако исходная база данных закрывает соединение примерно через 9-11 дней, поэтому при перезапуске мне нужен способ продолжить с последней операции после перезапуска.

В настоящее время я написал скрипт (python), который выполняет передачу. Поэтому я использую терминальный мультиплексор (экран), чтобы он работал на ВМ. Но затем, скажем, через неделю, исходный сервер базы данных закрывает соединение, поэтому процесс останавливается (без проблем с сеансом экрана, он все еще жив).

В моем сценарии я делаю так, чтобы набор результатов (в json) отображался, а затем передавался, так что я уверен, что процесс продолжается. Но когда исходный db-сервер закрывает соединение (так что никакой набор результатов снова не выбирается), и все, что я получаю, это сообщение connection already closed.

Пожалуйста, смотрите мой скрипт на python ниже (я совсем новичок в python).

Моя цель здесь - реализовать способ продолжения от ранее обработанной строки после возникновения проблемы connection already closed.

    db = "select to_json(d) from (  select \
        n.noise_data as measurand, \
        n.factor as \"sonometerClass\", \
        to_timestamp(n.seconds) as \"dateObserved\", \
        l.description as name, \
            json_build_object( \
                'coordinates', \
                json_build_array(l.node_lon, l.node_lat) \
            ) as location \
        from noise as n \
            inner join deployment as d on \
                d.deployment_id = n.deployment_id \
            inner join location as l on \
                l.location_id = d.location_id \
       ORDER BY n.seconds ASC \
    ) as d"
    return db

def main():

    conn = None
    try:
        params = config()
        with psycopg2.connect(**params) as conn:
            with conn.cursor(name='m_cursor') as cur:
                cur.itersize = 1000
                cur.execute(val_json())

                for row in tqdm(cur):
                    jData = json.dumps(row)
                    print(jData)

                   # do transfer stuff here...
                     ...
                    sleep(2)

                cur.close()
    except (Exception, psycopg2.DatabaseError) as error:
        print(error)
    finally:
        if conn is not None:
            conn.close()

if __name__ == '__main__':
    main()
...