Запуск скрипта для передачи наборов данных, который требует около 3-4 месяцев для завершения через ssh. К сожалению, разрыв соединения через 6-8 дней, поэтому требуется перезагрузка.
Сценарий:
import psycopg2
from time import sleep
from config import config
from tqdm import tqdm
import requests
import json
import subprocess
subprocess.call("./airquality.sh", shell=True)
def val_json():
db = "select to_json(d) from ( select \
a.particles_data as particles, \
a.o3_data as \"O3\", \
to_timestamp(a.seconds) as \"dateObserved\", \
l.description as name, \
json_build_object( \
'coordinates', \
json_build_array(l.node_lon, l.node_lat) \
) as location \
from airquality as a \
inner join deployment as d on \
d.deployment_id = a.deployment_id \
inner join location as l on \
l.location_id = d.location_id \
) as d"
return db
def main():
url = 'http://localhost:1026/v2/entities/003/attrs?options=keyValues'
headers = {"Content-Type": "application/json", \
"fiware-service": "urbansense", \
"fiware-servicepath": "/basic"}
conn = None
try:
params = config()
with psycopg2.connect(**params) as conn:
with conn.cursor(name='my_cursor') as cur:
cur.itersize = 2000
cur.execute(val_json())
# row = cur.fetchone()
for row in tqdm(cur):
jsonData = json.dumps(row)
if jsonData.startswith('[') and jsonData.endswith(']'):
jsonData = jsonData[1:-1]
print(jsonData)
requests.post(url, data= jsonData, headers=headers)
sleep(1)
cur.close()
except (Exception, psycopg2.DatabaseError) as error:
print(error)
finally:
if conn is not None:
conn.close()
if __name__ == '__main__':
main()
Как мне создать файл и следить за ходом передачи, чтобы при повторном запуске этого сценария (после потери соединения) наборы данных выбирались с того места, где оно ранее останавливалось?
EDIT:
Oops! Я потерян где-то по пути.
Мне удалось запустить сценарии и записать ход выполнения в текстовый файл (air.txt
), который я создал вручную с содержимым 0
(в противном случае сценарий вообще не будет работать).
При запуске этого скрипта содержимое файла air.txt
обновляется со значением позиции курсора.
Проблема:
Теперь у меня проблема в том, что когда я остановил выполнение скрипта (как способ проверки) и перезапустил его снова, чтобы убедиться, что он выбирает с предыдущей позиции, скрипты начинаются с 0
, снова перезаписывая предыдущее значение (и начинают новый счет вместо того, чтобы читать это как начальную позицию).
Ниже мой обновленный скрипт:
def val_json():
db = "select to_json(d) from ( select \
a.particles_data as particles, \
a.o3_data as \"O3\", \
to_timestamp(a.seconds) as \"dateObserved\", \
l.description as name, \
json_build_object( \
'coordinates', \
json_build_array(l.node_lon, l.node_lat) \
) as location \
from airquality as a \
inner join deployment as d on \
d.deployment_id = a.deployment_id \
inner join location as l on \
l.location_id = d.location_id \
) as d"
return db
def main():
RESTART_POINT_FILE = 'air.txt'
conn = None
try:
params = config()
with open(RESTART_POINT_FILE) as fd:
rows_to_skip = int(next(fd))
#except OSError:
rows_to_skip = 0
with psycopg2.connect(**params) as conn:
with conn.cursor(name='my_cursor') as cur:
cur.itersize = 2000
cur.execute(val_json())
for processed_rows, row in enumerate(tqdm(cur)):
if processed_rows < rows_to_skip: continue
jsonData = json.dumps(row)
if jsonData.startswith('[') and jsonData.endswith(']'):
jsonData = jsonData[1:-1]
print('\n', processed_rows, '\t', jsonData)
#update progress file...
with open(RESTART_POINT_FILE, "w") as fd:
print(processed_rows, file=fd)
sleep(1)
cur.close()
except (Exception, psycopg2.DatabaseError) as error:
print(error)
finally:
if conn is not None:
conn.close()
if __name__ == '__main__':
main()