Как использовать MYSQL запрос select с многопоточностью без дублирования результатов запроса select? - PullRequest
0 голосов
/ 06 января 2020

Краткий контекст: я использую таблицу mysql для выбора значения, используя API + значение, я получаю результат, и результат сохраняется в той же таблице.

Проблема: Как обработать несколько строки одновременно? всякий раз, когда я использую поток для запуска функции, он выбирает одно и то же значение для каждого потока (то есть курсор возвращает одинаковое значение для каждого потока). мне нужно разное значение для каждого потока для обработки. так что я уменьшу время.

Моя программа

import requests
import os
import json
import pymysql
import threading

conn = pymysql.connect(host='localhost', user=USER, passwd=PASSWORD, db='sampledb',charset='utf8mb4',autocommit=True)

url = "http://www.someapi.com/somelink/"

cur = conn.cursor()

def main():
    cur.execute("select asset_id from getprocessid where status =%s LIMIT 1",("uploaded",))
    idofassets = cur.fetchone()[0]
    req = requests.Session()
    resp = req.get(url+str(idofassets))
    resp_json = json.loads(resp.text)
    actual = resp_json['getResponse']
    cur.execute("update getprocessid set class = %s ,status =%s where asset_id = %s",(str(actual),"completed",str(idofasset),))

while True:

    # For threading purpose i added

    thread1 = threading.Thread(target=main)
    thread2 = threading.Thread(target=main)
    thread3 = threading.Thread(target=main)

    thread1.start()
    thread2.start()
    thread3.start()

    thread1.join()
    thread2.join()
    thread3.join()

Ответы [ 2 ]

0 голосов
/ 06 января 2020

Один из самых простых способов (синтаксис приблизительный).

Каждый поток должен иметь свой собственный номер в переменной my_number, которая уникальна для всех потоков.

Добавить thread INT DEFAULT NULL поле в структуре.

Поток пытается зарезервировать одну незарезервированную запись с помощью

cur.execute("UPDATE getprocessid SET thread = %s WHERE thread IS NULL AND status=%s LIMIT 1",(my_number,"uploaded",))

Затем поток обрабатывает эту зарезервированную запись:

cur.execute("select asset_id from getprocessid where thread=%s",(my_number,))
row = cur.fetchone()
if row is not None:
    process the record

Если резервирование прошло успешно, то зарезервированная запись обрабатывается. Если другой поток перезаписал значение резервирования, ни одна запись не вернулась, и он будет обнаружен IF - код обработки пропускается, и поток пытается зарезервировать другую запись.

0 голосов
/ 06 января 2020

Ваша проблема, кажется, разделена на две основные задачи:

1 - получение результатов из таблицы getprocessid MySQL

2 - обработка результата и его обновление таблица (но разные поля)

Таким образом, один из способов оптимизировать ваш код - создать поток (это может быть основной поток), выполнить шаг 1, а затем разделить проблему на шаге 2 между вашими тремя потоками:

import requests
import os
import json
import pymysql
import threading
#you can create these dynamically if you 
#want more (or less) threads
batches = [[], [], []]

conn = pymysql.connect(host='localhost', user=USER, 
  passwd=PASSWORD, 
db='sampledb',charset='utf8mb4',autocommit=True)

url = "http://www.someapi.com/somelink/"

cur = conn.cursor()

def fetch_and_split():
    cur.execute("select asset_id from getprocessid 
      where status =%s LIMIT 1",("uploaded",))
    results = cur.fetchall()
    count = 0
    #this populates the lists to be processed with the ids
    while count < size(results):
        cur_batch = batches[size(batches) % count ]
        cur_batch.append(results[count][0])
        count++

def process_and_update(batch):
    #each thread receives its own list
    for idofassets in batch:
        req = requests.Session()
        resp = req.get(url+str(idofassets))
        resp_json = json.loads(resp.text)
        actual = resp_json['getResponse']
        cur.execute("update getprocessid set class = %s 
          ,status =%s where asset_id = %s", 
          (str(actual),"completed",str(idofasset),))


while True:

    # For threading purpose i added
    # The main thread splits the results
    fetch_and_split()    
    # The other threads process the 
    # results and update the values
    thread1 = threading.Thread(target=process_and_update, args=(batches[0],))
    thread2 = threading.Thread(target=process_and_update, args=(batches[1],))
    thread3 = threading.Thread(target=process_and_update, args=(batches[2],))

    thread1.start()
    thread2.start()
    thread3.start()

    thread1.join()
    thread2.join()
    thread3.join()
...