Как я могу использовать многопоточность (или многопроцессорность?) Для более быстрой загрузки данных? - PullRequest
0 голосов
/ 22 января 2019

У меня есть список вопросов (проблемы с jira):

listOfKeys = [id1,id2,id3,id4,id5...id30000]

Я хочу получить рабочие журналы по этим вопросам, для этого я использовал библиотеку jira-python и этот код:

listOfWorklogs=pd.DataFrame()                 (I used pandas (pd) lib)
lst={}                                       #dictionary for help, where the worklogs will be stored
for i in range(len(listOfKeys)):
    worklogs=jira.worklogs(listOfKeys[i])    #getting list of worklogs
    if(len(worklogs)) == 0:
        i+=1
    else:
        for j in range(len(worklogs)):
            lst = {
                    'self': worklogs[j].self,  
                    'author': worklogs[j].author,
                    'started': worklogs[j].started,
                    'created': worklogs[j].created,
                    'updated': worklogs[j].updated,
                    'timespent': worklogs[j].timeSpentSeconds
                }
            listOfWorklogs = listOfWorklogs.append(lst, ignore_index=True)
########### Below there is the recording to the .xlsx file ################

поэтому я просто вхожу в рабочий журнал каждой проблемы в простом цикле, который эквивалентен ссылке на ссылку: https://jira.mycompany.com/rest/api/2/issue/issueid/worklogs и получение информации по этой ссылке

Проблема в том, что таких проблем более 30 000. и цикл слишком медленный (примерно 3 секунды на 1 выпуск) Можно ли как-то запустить несколько циклов / процессов / потоков параллельно, чтобы ускорить процесс получения рабочих журналов (возможно, без библиотеки jira-python)?

Ответы [ 3 ]

0 голосов
/ 22 января 2019

У меня есть некоторые проблемы ((

1) в коде, где появляется первый цикл «for» и начинается первая инструкция «if» (эта инструкция и все, что ниже, должны быть включены в цикл,верно?)

for i in range(len(listOfKeys)-99):
    worklogs=jira.worklogs(listOfKeys[i])    #getting list of worklogs
    if(len(worklogs)) == 0:
    ....

2) cmd, приглашение conda и Spyder не позволили вашему коду работать по причине: Ошибка многопроцессорной обработки Python: AttributeError: модуль '__ main__' не имеет атрибута * spec 'После исследования в Google мне пришлось установить немного выше в коде: spec = Нет (но я не уверен, что это правильно), и эта ошибка исчезла.Кстати, код в Jupyter Notebook работал без этой ошибки, но listOfWorklogs пуст, и это неправильно.

3), когда я исправил отступы и установил __spec __ = None, в этом месте произошла новая ошибка: обрабатывает ошибку [i] .start (), например: «PicklingError: невозможно выбрать: поиск атрибута PropertyHolder на jira.resources не выполнен»

, если я уберу скобки из методов start и join, кодбудет работать, но у меня не будет никаких записей в спискеOfWorklogs (((

Я снова прошу вашей помощи!)

0 голосов
/ 22 января 2019

Как насчет того, чтобы думать об этом не с технической точки зрения, а с логической точки зрения?Вы знаете, что ваш код работает, но со скоростью 3 секунды на 1 проблему, что означает, что для его завершения потребуется 25 часов.Если у вас есть возможность разделить # проблем Jira, которые передаются в сценарий (возможно, использовать дату или ключ выпуска и т. Д.), Вы можете создать несколько разных файлов .py с практически одинаковым кодом, вы просто передадите каждую из них.другой список билетов Jira.Таким образом, вы можете просто запустить 4 из них одновременно и сократить время до 6,25 часа каждый.

0 голосов
/ 22 января 2019

Я переработал кусок кода, который я сделал в ваш код, надеюсь, это поможет:

from multiprocessing import Manager, Process, cpu_count

def insert_into_list(worklog, queue):
    lst = {
        'self': worklog.self,  
        'author': worklog.author,
        'started': worklog.started,
        'created': worklog.created,
        'updated': worklog.updated,
        'timespent': worklog.timeSpentSeconds
    }
    queue.put(lst)
    return

# Number of cpus in the pc
num_cpus = cpu_count()
index = 0

# Manager and queue to hold the results
manager = Manager()

# The queue has controlled insertion, so processes don't step on each other
queue = manager.Queue()

listOfWorklogs=pd.DataFrame()
lst={}                                       
for i in range(len(listOfKeys)):
    worklogs=jira.worklogs(listOfKeys[i])    #getting list of worklogs
if(len(worklogs)) == 0:
    i+=1
else:

    # This loop replaces your "for j in range(len(worklogs))" loop
    while index < len(worklogs):
        processes = []
        elements = min(num_cpus, len(worklogs) - index)

        # Create a process for each cpu
        for i in range(elements):
            process = Process(target=insert_into_list, args=(worklogs[i+index], queue))
            processes.append(process)

        # Run the processes
        for i in range(elements):
            processes[i].start()

        # Wait for them to finish
        for i in range(elements):
            processes[i].join(timeout=10)

        index += num_cpus

    # Dump the queue into the dataframe
    while queue.qsize() != 0:
        listOfWorklogs.append(q.get(), ignore_index=True)

Это должно сработать и сократить время в несколько раз меньше, чем количество процессоров в вашей машине. Вы можете попробовать изменить это число вручную для повышения производительности. В любом случае мне очень странно, что на одну операцию уходит около 3 секунд.

PS: я не смог попробовать код, потому что у меня нет примеров, возможно, есть ошибки

...