Thread.getName () не представляет фактическое имя работника - PullRequest
0 голосов
/ 10 апреля 2019

Я написал скрипт на python, который импортирует рыночные данные в базу данных MariaDB.Чтобы ускорить импорт, я решил использовать модуль Threading.Поэтому сначала функция заполняет очередь URL-адресами, с которых данные загружаются и импортируются в мою базу данных.К сожалению, функция импорта, кажется, обрабатывается только одним потоком вместо многих.

import queue
from threading import Thread

num_threads = 4
threads = []
urls = queue.Queue()

def create_url():    

   ...
   getlist of items
   ...

   for row in item_list:
      url = 'https://someurl=' + str(row[0])
      urls.put(url)

   return urls


def import_mo(urls):
    station_id = 60003760

    print(worker.getName())

    try:
        mariadb_connection = mariadb.connect(allthedbstuff)
        cursor = mariadb_connection.cursor()

        while (True):
            url = urls.get()
            print("%s processes %s queue# %s" % (worker.getName(), url, urls.qsize()))
            if url == None:
                break
            s = requests.Session()
            retries = Retry(total=5, backoff_factor=1, status_forcelist=[502, 503, 504])
            s.mount('https://', HTTPAdapter(max_retries=retries))
            jsonraw = s.get(url)
            jsondata = ujson.loads(jsonraw.text)

            for row in jsondata:
                if (row['location_id'] == station_id):
                    cursor.execute(
                        'INSERT INTO tbl_mo_tmp (order_id) VALUES (%s)', (row['order_id'], ))

                cursor.execute('SELECT order_id from tbl_mo WHERE order_id = %s',
                               (row['order_id'], ))
                exists_mo = cursor.fetchall()

                if len(exists_mo) != 0:
                    # print("updating order#", row['order_id'])
                    cursor.execute('UPDATE tbl_mo SET volume = %s, price = %s WHERE order_id = %s',
                                   (row['volume_remain'], row['price'], row['order_id'], ))
                    mariadb_connection.commit()
                else:
                    if (row['location_id'] == station_id):
                        # print("newly inserting order#", row['order_id'])
                        cursor.execute('INSERT INTO tbl_mo (type_id, order_id, ordertype,volume, price) VALUES (%s,%s,%s,%s,%s)',
                                       (row['type_id'], row['order_id'], row['is_buy_order'], row['volume_remain'], row['price'], ))
                    mariadb_connection.commit()
            urls.task_done()

    except mariadb.Error as error:
        mariadb_connection.rollback()  # rollback if any exception occured

    finally:
        # closing database connection.
        if mariadb_connection.is_connected():
            cursor.close()
            mariadb_connection.close()

def cleanup_mo():
   ...
   do cleanup stuff
   ...

create_url()

for i in range(num_threads):
    worker = Thread(target=import_mo, args=(urls,))
    worker.setDaemon(True)
    threads.append(worker)
    worker.start()


for i in range(num_threads):
    urls.put(None)

for worker in threads:
    worker.join()

cleanup_mo()

Выходные состояния в начале:

Thread-1
Thread-2
Thread-3
Thread-4

, который показывает мне, что создано 4 отдельных работника, но при входе в цикл while создается впечатление, что только один рабочий действительно обрабатывает извлеченные URL-адреса.

Thread-1 processes https://someurl=2 queue# 32
Thread-1 processes https://someurl=3 queue# 31
Thread-1 processes https://someurl=4 queue# 30
Thread-1 processes https://someurl=5 queue# 29
Thread-1 processes https://someurl=6 queue# 28
Thread-1 processes https://someurl=7 queue# 27
Thread-1 processes https://someurl=8 queue# 26
Thread-1 processes https://someurl=9 queue# 25
Thread-1 processes https://someurl=10 queue# 24
Thread-1 processes https://someurl=11 queue# 23
Thread-1 processes https://someurl=12 queue# 22
Thread-1 processes https://someurl=13 queue# 21
Thread-1 processes https://someurl=14 queue# 20
Thread-1 processes https://someurl=15 queue# 19
Thread-1 processes https://someurl=16 queue# 18
Thread-1 processes https://someurl=17 queue# 17
Thread-1 processes https://someurl=18 queue# 16

Я ожидаю, что результат будет выглядеть (в идеале):

Thread-1 processes https://someurl=2 queue# 32
Thread-2 processes https://someurl=3 queue# 31
Thread-3 processes https://someurl=4 queue# 30
Thread-4 processes https://someurl=5 queue# 29

Что яздесь не хватает?

1 Ответ

0 голосов
/ 10 апреля 2019

Чтобы напечатать разные «имена» для каждого работника:

def import_mo(i, urls):
    station_id = 60003760

    print('Worker', i)
    # etc
    # later:
        print("Worker %s processes %s queue# %s" % (i, url, urls.qsize()))

и создание тем:

for i in range(num_threads):
    worker = Thread(target=import_mo, args=(i,urls,))
    worker.setDaemon(True)
    threads.append(worker)
    worker.start()
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...