Создать файл для отслеживания хода выполнения задачи - PullRequest
0 голосов
/ 14 мая 2019

Запуск скрипта для передачи наборов данных, который требует около 3-4 месяцев для завершения через ssh. К сожалению, разрыв соединения через 6-8 дней, поэтому требуется перезагрузка.

Сценарий:

import psycopg2
from time import sleep
from config import config
from tqdm import tqdm
import requests
import json
import subprocess

subprocess.call("./airquality.sh", shell=True)

def val_json():
    db = "select to_json(d) from (  select \
        a.particles_data as particles, \
        a.o3_data as \"O3\", \
        to_timestamp(a.seconds) as \"dateObserved\", \
        l.description as name, \
            json_build_object( \
                'coordinates', \
                json_build_array(l.node_lon, l.node_lat) \
            ) as location \
        from airquality as a \
            inner join deployment as d on \
                d.deployment_id = a.deployment_id \
            inner join location as l on \
                l.location_id = d.location_id \
    ) as d"
    return db

def main():

    url = 'http://localhost:1026/v2/entities/003/attrs?options=keyValues'
    headers = {"Content-Type": "application/json", \
               "fiware-service": "urbansense",  \
               "fiware-servicepath": "/basic"}
    conn = None
    try:
        params = config()
        with psycopg2.connect(**params) as conn:
            with conn.cursor(name='my_cursor') as cur:
                cur.itersize = 2000
                cur.execute(val_json())
       # row = cur.fetchone()
                for row in tqdm(cur):
                    jsonData = json.dumps(row)
                    if jsonData.startswith('[') and jsonData.endswith(']'):
                        jsonData = jsonData[1:-1]
                        print(jsonData)
                    requests.post(url, data= jsonData, headers=headers)
                    sleep(1)

                cur.close()
    except (Exception, psycopg2.DatabaseError) as error:
        print(error)
    finally:
        if conn is not None:
            conn.close()

if __name__ == '__main__':
    main()

Как мне создать файл и следить за ходом передачи, чтобы при повторном запуске этого сценария (после потери соединения) наборы данных выбирались с того места, где оно ранее останавливалось?

EDIT:

Oops! Я потерян где-то по пути. Мне удалось запустить сценарии и записать ход выполнения в текстовый файл (air.txt), который я создал вручную с содержимым 0 (в противном случае сценарий вообще не будет работать). При запуске этого скрипта содержимое файла air.txt обновляется со значением позиции курсора.

Проблема:

Теперь у меня проблема в том, что когда я остановил выполнение скрипта (как способ проверки) и перезапустил его снова, чтобы убедиться, что он выбирает с предыдущей позиции, скрипты начинаются с 0, снова перезаписывая предыдущее значение (и начинают новый счет вместо того, чтобы читать это как начальную позицию). Ниже мой обновленный скрипт:

def val_json():
    db = "select to_json(d) from (  select \
        a.particles_data as particles, \
        a.o3_data as \"O3\", \
        to_timestamp(a.seconds) as \"dateObserved\", \
        l.description as name, \
            json_build_object( \
                'coordinates', \
                json_build_array(l.node_lon, l.node_lat) \
            ) as location \
        from airquality as a \
            inner join deployment as d on \
                d.deployment_id = a.deployment_id \
            inner join location as l on \
                l.location_id = d.location_id \
    ) as d"
    return db

def main():
    RESTART_POINT_FILE = 'air.txt'
    conn = None
    try:
        params = config()
        with open(RESTART_POINT_FILE) as fd:
           rows_to_skip = int(next(fd))
    #except OSError:
        rows_to_skip = 0
        with psycopg2.connect(**params) as conn:
            with conn.cursor(name='my_cursor') as cur:
                cur.itersize = 2000
                cur.execute(val_json())

                for processed_rows, row in enumerate(tqdm(cur)):
                    if processed_rows < rows_to_skip: continue
                    jsonData = json.dumps(row)
                    if jsonData.startswith('[') and jsonData.endswith(']'):
                        jsonData = jsonData[1:-1]

                        print('\n', processed_rows, '\t', jsonData)
                    #update progress file...
                    with open(RESTART_POINT_FILE, "w") as fd:
                        print(processed_rows, file=fd)
                    sleep(1)

                cur.close()
    except (Exception, psycopg2.DatabaseError) as error:
        print(error)

    finally:
        if conn is not None:
            conn.close()

if __name__ == '__main__':
    main()

Ответы [ 3 ]

1 голос
/ 14 мая 2019

Простой способ - использовать выделенный файл в известном месте.

Этот файл будет содержать одну строку с количеством строк, успешно обработанных или отсутствующих.

При запуске, если файл отсутствует, количество пропускаемых записей будет равно 0, а если оно присутствует, количество пропущенных записей будет равно числу в первой строке файла.Цикл должен быть изменен, чтобы пропустить эти записи и отслеживать номер последней обработанной записи.

При успешном завершении файл должен быть удален, и при ошибке, если он должен быть записан с номеромпоследняя успешно обработанная запись.

Скелетный код:

RESTART_POINT_FILE = ... # full path of the restart point file

# begin: read the file:
try:
    with open(RESTART_POINT_FILE) as fd:
        rows_to_skip = int(next(fd))
except OSError:
    rows_to_skip = 0

# loop:

                for processed_row, row in enumerate(tqdm(cur)):
                    if processed_row < rows_to_skip: continue
                    ...

# end
    except (Exception, psycopg2.DatabaseError) as error:
        print(error)
        # write the file
        with open(RESTART_POINT_FILE, "w") as fd:
            print(processed_rows, file=fd)
    finally:
        if conn is not None:
            conn.close()
        # try to remove the file if it exists
        try:
            os.remove(RESTART_POINT_FILE)
        except OSError:
            pass

ВНИМАНИЕ: ничего не было проверено ...

0 голосов
/ 14 мая 2019

Если ваша проблема вызвана исключительно тайм-аутом вашего удаленного терминала ssh, простой ответ: используйте мультиплексор терминала, такой как tmux , screen , который будет работать на удаленном компьютере, и чтобы ваша программа работала даже с тайм-аутом сеансов, вам нужно будет только повторно подключиться, когда это удобно, и заново подключить терминал, чтобы увидеть, как он обрабатывает, или даже «отключить терминал», такой как nohup (но тогда вам нужно будет перенаправить стандартный вывод в файл при необходимости).

Однако это не спасет вас от случайного уничтожения OOM, перезагрузки сервера, ...), для этого регулярная сериализация состояния программы с механизмом перезагрузки является хорошей идеей

0 голосов
/ 14 мая 2019

попробуйте использовать цикл while для соединения с True или FALS, а когда соединение установлено с FALS, дождитесь его истины снова

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...