текущая транзакция прервана, команды игнорируются до конца блока транзакции - PullRequest
0 голосов
/ 29 января 2019

Я заменил драйвер sqlite3 на psycopg2 в потоке ThreadPoolExecutor (поток делает запросы к удаленному сервису, получает статусы и обновляет поля в таблице):

global conn
global c
conn = psycopg2.connect(DB_URL)
c = conn.cursor()

def load_url(id, data):
    global counter_valid
    global counter_invalid
    global counter_errors
    try:
        headers = {"Authorization": "Bearer XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX", "Content-Type" : "application/json; charset=utf-8"}
        offer = {"userid": "XXXXXXXXXXXXXXXXXXX", "country": data['country'], "url": data['url'], "platform": data['device']}
        print offer
        r = requests.post("https://api.offertest.net/offertest", headers=headers, data=json.dumps(offer))

        response = r.json()

        print id

        if 'code' in response:
            if 'code' == 415:
                counter_errors += 1
                print "The problem: ", response['message'], data

                c.execute(
                    "UPDATE offers SET error = %s WHERE id = %s", (response['message'], id))
                print c.rowcount
                conn.commit()
        if 'status' in response:
            if response['status'] == u'ERROR':
                print "Some error with inputs: ", data
                c.execute(
                        "UPDATE offers SET error = %s WHERE id = %s", ("Error with inputs", id))
                print c.rowcount
                conn.commit()
            else:
                redirections = int(response['nRedir'])

                print "Amount of redirects:", redirections
                c.execute("UPDATE offers SET redirections = %s WHERE id = %s", (redirections, id))
                conn.commit()
                status = 'valid' if response['bundleIdMatch'] else 'invalid'
                if response['bundleIdMatch']:
                    counter_valid += 1
                else:
                    counter_invalid += 1
                print status
                time.sleep(1)
                c.execute(
                    "UPDATE offers SET valid = %s, error = NULL WHERE id = %s", (status, id))
                print c.rowcount
                conn.commit()
                # conn.close()
                print response['bundleIdMatch']

        return response
    except Exception as e:
        raise e
        # ipdb.set_trace()

Вот сам ThreadPoolExecutor:

with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
    future_to_url = {executor.submit(load_url, data['id'], data): data for data in datas}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))

В течение некоторого времени шаблон ThreadPool работает правильно, но внезапно в середине работы он запускает исключения, подобные следующим:

org.postgresql.util.PSQLException: сгенерированные данныеисключение: текущая транзакция прервана, команды игнорируются до конца блока транзакции

Как изменить load_url для корректной обработки этого исключения?

...