Я написал скрипт на python, который импортирует рыночные данные в базу данных MariaDB.Чтобы ускорить импорт, я решил использовать модуль Threading.Поэтому сначала функция заполняет очередь URL-адресами, с которых данные загружаются и импортируются в мою базу данных.К сожалению, функция импорта, кажется, обрабатывается только одним потоком вместо многих.
import queue
from threading import Thread
num_threads = 4
threads = []
urls = queue.Queue()
def create_url():
...
getlist of items
...
for row in item_list:
url = 'https://someurl=' + str(row[0])
urls.put(url)
return urls
def import_mo(urls):
station_id = 60003760
print(worker.getName())
try:
mariadb_connection = mariadb.connect(allthedbstuff)
cursor = mariadb_connection.cursor()
while (True):
url = urls.get()
print("%s processes %s queue# %s" % (worker.getName(), url, urls.qsize()))
if url == None:
break
s = requests.Session()
retries = Retry(total=5, backoff_factor=1, status_forcelist=[502, 503, 504])
s.mount('https://', HTTPAdapter(max_retries=retries))
jsonraw = s.get(url)
jsondata = ujson.loads(jsonraw.text)
for row in jsondata:
if (row['location_id'] == station_id):
cursor.execute(
'INSERT INTO tbl_mo_tmp (order_id) VALUES (%s)', (row['order_id'], ))
cursor.execute('SELECT order_id from tbl_mo WHERE order_id = %s',
(row['order_id'], ))
exists_mo = cursor.fetchall()
if len(exists_mo) != 0:
# print("updating order#", row['order_id'])
cursor.execute('UPDATE tbl_mo SET volume = %s, price = %s WHERE order_id = %s',
(row['volume_remain'], row['price'], row['order_id'], ))
mariadb_connection.commit()
else:
if (row['location_id'] == station_id):
# print("newly inserting order#", row['order_id'])
cursor.execute('INSERT INTO tbl_mo (type_id, order_id, ordertype,volume, price) VALUES (%s,%s,%s,%s,%s)',
(row['type_id'], row['order_id'], row['is_buy_order'], row['volume_remain'], row['price'], ))
mariadb_connection.commit()
urls.task_done()
except mariadb.Error as error:
mariadb_connection.rollback() # rollback if any exception occured
finally:
# closing database connection.
if mariadb_connection.is_connected():
cursor.close()
mariadb_connection.close()
def cleanup_mo():
...
do cleanup stuff
...
create_url()
for i in range(num_threads):
worker = Thread(target=import_mo, args=(urls,))
worker.setDaemon(True)
threads.append(worker)
worker.start()
for i in range(num_threads):
urls.put(None)
for worker in threads:
worker.join()
cleanup_mo()
Выходные состояния в начале:
Thread-1
Thread-2
Thread-3
Thread-4
, который показывает мне, что создано 4 отдельных работника, но при входе в цикл while создается впечатление, что только один рабочий действительно обрабатывает извлеченные URL-адреса.
Thread-1 processes https://someurl=2 queue# 32
Thread-1 processes https://someurl=3 queue# 31
Thread-1 processes https://someurl=4 queue# 30
Thread-1 processes https://someurl=5 queue# 29
Thread-1 processes https://someurl=6 queue# 28
Thread-1 processes https://someurl=7 queue# 27
Thread-1 processes https://someurl=8 queue# 26
Thread-1 processes https://someurl=9 queue# 25
Thread-1 processes https://someurl=10 queue# 24
Thread-1 processes https://someurl=11 queue# 23
Thread-1 processes https://someurl=12 queue# 22
Thread-1 processes https://someurl=13 queue# 21
Thread-1 processes https://someurl=14 queue# 20
Thread-1 processes https://someurl=15 queue# 19
Thread-1 processes https://someurl=16 queue# 18
Thread-1 processes https://someurl=17 queue# 17
Thread-1 processes https://someurl=18 queue# 16
Я ожидаю, что результат будет выглядеть (в идеале):
Thread-1 processes https://someurl=2 queue# 32
Thread-2 processes https://someurl=3 queue# 31
Thread-3 processes https://someurl=4 queue# 30
Thread-4 processes https://someurl=5 queue# 29
Что яздесь не хватает?