_wait_for_tstate_lock ошибка в многопоточном приложении - PullRequest
2 голосов
/ 21 апреля 2019

Я написал скрипт на python, который импортирует рыночные данные в базу данных MariaDB. Для ускорения импорта я решил использовать модуль Threading. Поэтому сначала функция заполняет очередь URL-адресами, с которых данные загружаются и импортируются в мою базу данных. У меня нет проблем, если сценарии запускаются в первый раз, независимо от количества потоков, поэтому доступные данные импортируются впервые. При повторном запуске сохраненные данные будут проверены и обновлены при необходимости. Иначе это будет проигнорировано.

Как ни странно, я получаю сообщение об ошибке, когда скрипт выполняется более чем с 1 потоком, и во второй раз, где задействованы механизмы обновления.

Я борюсь с этой проблемой уже более недели, и я помогу вам подсказать, в чем может быть проблема.

#!/usr/local/bin/python3
import requests
import queue
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
import ujson
from datetime import datetime
import mysql.connector as mariadb
from mysql.connector import Error
from mysql.connector import errorcode
from threading import Thread
import time

num_threads = 4
threads = []
urls = queue.Queue()


def create_url():
    try:
        mariadb_connection = mariadb.connect(dbstuff)
        cursor = mariadb_connection.cursor()

        cursor.execute('SELECT type_id from tbl_items')
        item_list = cursor.fetchall()

        for row in item_list:
            url = 'https://esi.evetech.net/latest/markets/10000002/orders/?datasource=tranquility&order_type=all&page=1&type_id=' + \
                str(row[0])
            urls.put(url)

        return urls

    except mariadb.Error as error:
        mariadb_connection.rollback()  # rollback if any exception occured
        print("Failed retrieving itemtypes from tbl_items table {}".format(error))

    finally:
        if mariadb_connection.is_connected():
            cursor.close()
            mariadb_connection.close()


def import_mo_jita(i, urls):
    station_id = 60003760

    print("worker:", i)

    try:
        mariadb_connection = mariadb.connect(dbstuff)
        cursor = mariadb_connection.cursor()

        while (True):
            url = urls.get()
            print("Worker %s processes %s queue# %s" % (i, url, urls.qsize()))
            if url == None:
                break
            s = requests.Session()
            retries = Retry(total=5, backoff_factor=1, status_forcelist=[502, 503, 504])
            s.mount('https://', HTTPAdapter(max_retries=retries))
            jsonraw = s.get(url)
            jsondata = ujson.loads(jsonraw.text)

            for row in jsondata:
                if (row['location_id'] == station_id):
                    cursor.execute('INSERT INTO tbl_mo_jita_esi_tmp (order_id) VALUES (%s)', (row['order_id'], ))
                    mariadb_connection.commit()
                    cursor.execute('SELECT order_id, price, volume FROM tbl_mo_jita WHERE order_id = %s', (row['order_id'], ))
                    db_data = cursor.fetchall()
                    #print (db_data)
                    if len(db_data) != 0:
                        for x in db_data:
                            db_order_id = x[0]
                            db_price = x[1]
                            db_volume = x[2]

                    if len(db_data) != 0:
                        if db_price == row['price'] and db_volume == row['volume_remain']:
                            continue
                        else:
                            print("updating order#", row['order_id'])
                            cursor.execute('UPDATE tbl_mo_jita SET volume = %s, price = %s WHERE order_id = %s', (row['volume_remain'], row['price'], row['order_id'], ))
                            mariadb_connection.commit()
                    else:
                        print("newly inserting order#", row['order_id'])
                        cursor.execute('INSERT INTO tbl_mo_jita (type_id, order_id, ordertype,volume, price) VALUES (%s,%s,%s,%s,%s)', (row['type_id'], row['order_id'], row['is_buy_order'], row['volume_remain'], row['price'], ))
                        mariadb_connection.commit()
                else:
                    continue
            urls.task_done()

    except mariadb.Error as error:
        mariadb_connection.rollback()  # rollback if any exception occured
        print("Failed retrieving itemtypes from tbl_items table {}".format(error))

    finally:
        if mariadb_connection.is_connected():
            cursor.close()
            mariadb_connection.close()


def cleanup_mo():
    try:
        mariadb_connection = mariadb.connect(dbstuff)
        cursor = mariadb_connection.cursor()

        cursor.execute('SELECT order_id FROM tbl_mo_jita')
        list_mo_sql = cursor.fetchall()
        cursor.execute('SELECT order_id FROM tbl_mo_jita_esi_tmp')
        list_mo_esi = cursor.fetchall()
        list_mo_purge = list(set(list_mo_sql)-set(list_mo_esi))
        print(len(list_mo_purge))

        for row in list_mo_purge:
            cursor.execute('DELETE FROM tbl_mo_jita WHERE order_id = %s', ((row[0]), ))

        cursor.execute('TRUNCATE tbl_mo_jita_esi_tmp')
        mariadb_connection.commit()

    except mariadb.Error as error:
        mariadb_connection.rollback()  # rollback if any exception occured
        print("Failed retrieving itemtypes from tbl_items table {}".format(error))

    finally:
        if mariadb_connection.is_connected():
            cursor.close()
            mariadb_connection.close()


create_url()

for i in range(num_threads):
    urls.put(None)

for i in range(num_threads):
    worker = Thread(target=import_mo_jita, args=(i, urls,))
    worker.setDaemon(True)
    threads.append(worker)
    worker.start()


for worker in threads:
    worker.join()

cleanup_mo()

Вывод ошибки:

Fatal Python error: Segmentation fault

Thread 0x00007f21a5079700 (most recent call first):
  File "/home/gregadmin/.local/lib/python3.6/site-packages/mysql/connector/connection_cext.py", line 329 in commit
  File "import_mo_jita.py", line 93 in import_mo_jita
  File "/usr/lib/python3.6/threading.py", line 864 in run
  File "/usr/lib/python3.6/threading.py", line 916 in _bootstrap_inner
  File "/usr/lib/python3.6/threading.py", line 884 in _bootstrap

Current thread 0x00007f21a587a700 (most recent call first):
  File "import_mo_jita.py", line 99 in import_mo_jita
  File "/usr/lib/python3.6/threading.py", line 864 in run
  File "/usr/lib/python3.6/threading.py", line 916 in _bootstrap_inner
  File "/usr/lib/python3.6/threading.py", line 884 in _bootstrap

Thread 0x00007f21a607b700 (most recent call first):
  File "import_mo_jita.py", line 87 in import_mo_jita
  File "/usr/lib/python3.6/threading.py", line 864 in run
  File "/usr/lib/python3.6/threading.py", line 916 in _bootstrap_inner
  File "/usr/lib/python3.6/threading.py", line 884 in _bootstrap

Thread 0x00007f21a687c700 (most recent call first):
  File "/usr/lib/python3.6/ssl.py", line 631 in read
  File "/usr/lib/python3.6/ssl.py", line 874 in read
  File "/usr/lib/python3.6/ssl.py", line 1012 in recv_into
  File "/usr/lib/python3.6/socket.py", line 586 in readinto
  File "/usr/lib/python3.6/http/client.py", line 258 in _read_status
  File "/usr/lib/python3.6/http/client.py", line 297 in begin
  File "/usr/lib/python3.6/http/client.py", line 1331 in getresponse
  File "/usr/lib/python3/dist-packages/urllib3/connectionpool.py", line 383 in _make_request
  File "/usr/lib/python3/dist-packages/urllib3/connectionpool.py", line 601 in urlopen
  File "/usr/lib/python3/dist-packages/requests/adapters.py", line 440 in send
  File "/usr/lib/python3/dist-packages/requests/sessions.py", line 630 in send
  File "/usr/lib/python3/dist-packages/requests/sessions.py", line 520 in request
  File "/usr/lib/python3/dist-packages/requests/sessions.py", line 533 in get
  File "import_mo_jita.py", line 87 in import_mo_jita
  File "/usr/lib/python3.6/threading.py", line 864 in run
  File "/usr/lib/python3.6/threading.py", line 916 in _bootstrap_inner
  File "/usr/lib/python3.6/threading.py", line 884 in _bootstrap

Thread 0x00007f21ad64d740 (most recent call first):
  File "/usr/lib/python3.6/threading.py", line 1072 in _wait_for_tstate_lock
  File "/usr/lib/python3.6/threading.py", line 1056 in join
  File "import_mo_jita.py", line 187 in <module>
Segmentation fault (core dumped)

1 Ответ

0 голосов
/ 21 апреля 2019

Скорее всего, у вас проблемы с клиентом БД.

  • Вы можете попробовать обновить клиента до последней доступной v8.0 .

  • Попробуйте открыть / закрыть курсор внутри цикла или даже для каждого отдельного запроса к БД.

  • Существует use_pure опция драйвера . Попробуйте тоже поиграть с ним ...

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...