Я заменил драйвер sqlite3 на psycopg2 в потоке ThreadPoolExecutor (поток делает запросы к удаленному сервису, получает статусы и обновляет поля в таблице):
global conn
global c
conn = psycopg2.connect(DB_URL)
c = conn.cursor()
def load_url(id, data):
global counter_valid
global counter_invalid
global counter_errors
try:
headers = {"Authorization": "Bearer XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX", "Content-Type" : "application/json; charset=utf-8"}
offer = {"userid": "XXXXXXXXXXXXXXXXXXX", "country": data['country'], "url": data['url'], "platform": data['device']}
print offer
r = requests.post("https://api.offertest.net/offertest", headers=headers, data=json.dumps(offer))
response = r.json()
print id
if 'code' in response:
if 'code' == 415:
counter_errors += 1
print "The problem: ", response['message'], data
c.execute(
"UPDATE offers SET error = %s WHERE id = %s", (response['message'], id))
print c.rowcount
conn.commit()
if 'status' in response:
if response['status'] == u'ERROR':
print "Some error with inputs: ", data
c.execute(
"UPDATE offers SET error = %s WHERE id = %s", ("Error with inputs", id))
print c.rowcount
conn.commit()
else:
redirections = int(response['nRedir'])
print "Amount of redirects:", redirections
c.execute("UPDATE offers SET redirections = %s WHERE id = %s", (redirections, id))
conn.commit()
status = 'valid' if response['bundleIdMatch'] else 'invalid'
if response['bundleIdMatch']:
counter_valid += 1
else:
counter_invalid += 1
print status
time.sleep(1)
c.execute(
"UPDATE offers SET valid = %s, error = NULL WHERE id = %s", (status, id))
print c.rowcount
conn.commit()
# conn.close()
print response['bundleIdMatch']
return response
except Exception as e:
raise e
# ipdb.set_trace()
Вот сам ThreadPoolExecutor:
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
future_to_url = {executor.submit(load_url, data['id'], data): data for data in datas}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
except Exception as exc:
print('%r generated an exception: %s' % (url, exc))
В течение некоторого времени шаблон ThreadPool работает правильно, но внезапно в середине работы он запускает исключения, подобные следующим:
org.postgresql.util.PSQLException: сгенерированные данныеисключение: текущая транзакция прервана, команды игнорируются до конца блока транзакции
Как изменить load_url для корректной обработки этого исключения?