В Python Pool одновременно работает более n рабочих и программа завершает работу, прежде чем все потоки будут выполнены - PullRequest
0 голосов
/ 17 мая 2019

Предполагается, что программа использует API translate-python для перевода ENGLISH_DICT на более чем 60 языков (ENGLISH_DICT был сокращен, как и LANG_CODES). Перевод огромного словаря на 60+ языков занимает около 2 часов с синхронизированным кодированием, поэтому я хотел использовать потоки.

Предполагается, что мой пул потоков имеет размер 4, но иногда мне удается запустить 10 потоков без завершения предыдущих потоков (выясните это, поместив оператор печати в первую и последнюю строку обработчика потоков). Кроме того, пул будет запускать несколько потоков, но как только несколько потоков завершатся, вся программа завершится, и я получу код завершения 0. И наконец, если мой максимальный размер пула равен 10, и у меня менее 10 потоков, программа немедленно завершает работу.

Более 4 запущенных потоков без завершения предыдущих

Только 8 потоков закончили работать из 65, которые были запланированы для запуска

Создано 9 потоков, но максимальный размер пула потоков равен 10. Потоки начали работать, но основная программа завершила работу с кодом завершения 0

import copy
import os
import json
import threading
from multiprocessing.dummy import Pool
from queue import Queue
from translate import Translator
LANG_CODES = {"af", "ar", "bn", "bs", "bg", "yue", "ca", "fi", "fr"}
VERIFIED_LANGUAGES = {'en', 'es', 'zh'}
TOTAL_TRANSLATIONS = len(LANG_CODES) - len(VERIFIED_LANGUAGES)
NUM_OF_THREADS = 100
DIR_NAME = 'translations'

#Iterate through nested dictionaries and translate string values
#Then prints the final dictionary as JSON

def translate(english_words: dict, dest_lang: str) -> str:
    stack = []
    cache = {}
    T = Translator(provider='microsoft', from_lang='en', to_lang=dest_lang, secret_access_key=API_SECRET1)
    translated_words = copy.deepcopy(english_words)

    ##Populate dictionary with top-level keys or translate top-level words
    for key in translated_words.keys():
        value = translated_words[key]

        if type(value) == dict:
            stack.append(value)
        else:
            if value in cache:
                translated_words[key] = cache[key]
            else:
                translation = T.translate(value)
                translated_words[key] = translation
                cache[translation] = translation


    while len(stack):
        dic = stack.pop()

        for key in dic.keys():
            value = dic[key]

            if type(value) == dict:
                stack.append(value)
            else:
                if value in cache:
                    dic[key] = cache[value]
                else:
                    # print('Translating "' + value +'" for', dest_lang)
                    translation = T.translate(value)
                    # print('Done translating "' + value +'" for', dest_lang)

                    # print('Translated', value, '->', translation)
                    cache[translation] = translation
                    dic[key] = translation

    return json.dumps(translated_words, indent=4)

##GENERATES A FOLDER CALLED 'translations' WITH LOCALE JSON FILES IN THE WORKING DIRECTORY THE SCRIPT IS LAUNCHED IN WITH MULTIPLE THREADS WORKING ON DIFFERENT LANGUAGES
def generate_translations(english_dict: dict):
    if not os.path.exists(DIR_NAME):
        os.mkdir(DIR_NAME)

    finished_langs = set(map(lambda file_name: file_name.split('.json')[0], os.listdir(DIR_NAME)))
    LANG_CODES.difference_update(finished_langs)

    pool = Pool(NUM_OF_THREADS)
    thread_params = [(english_dict, lang_code) for lang_code in sorted(LANG_CODES) if not lang_code.split('-')[0] in VERIFIED_LANGUAGES]
    pool.map_async(thread_handler, thread_params)
    pool.close()
    pool.join()
    print('DONE GENERATING')

##TRANSLATES AN ENTIRE DICTIONARY AND THEN WRITES IT TO A FILE IN THE TRANSLATION FOLDER
def thread_handler(params: tuple):
    english_dict, lang_code = params

    print('Translating for lang_code: ', lang_code)
    translated_string_json = translate(english_dict, lang_code)
    print('done getting string for', lang_code)
    file = open(DIR_NAME + '/' + lang_code + '.json', 'w')
    file.write(translated_string_json)
    file.close()
    num_of_langs_remaining = TOTAL_TRANSLATIONS - len(os.listdir(DIR_NAME))
    print('Done translating for lang_code: ' + lang_code +'.', num_of_langs_remaining, 'remaining.\n\n')


ENGLISH_DICT = {
  "changePassword": {
    "yourCurrentPassword": "Your current password",
    "newPassword": "New password",
    "reenterNewPassword": "Re-enter new password",
    "changePassword": "Change Password",
    "yourProfile": "Your Profile",
    "emptyFieldAlert": {
      "header": "All fields must not be empty",
      "body": "Please fill in all the fields"
    }
  }
}

if __name__ == '__main__':
    generate_translations(ENGLISH_DICT)

1 Ответ

0 голосов
/ 17 мая 2019

Потоки в Python не равны Потокам, например, в Java.На самом деле они не используют несколько процессорных ядер для параллельного выполнения вашего кода.Вместо этого используется многопроцессорная обработка.

multiprocessing.dummy просто использует API из многопроцессорной обработки, но на самом деле является оболочкой для многопоточности .Вы должны использовать

from multiprocessing import Pool

вместо этого для фактического распараллеливания и повышения производительности.

Вы должны посчитать количество потоков с помощью

print(len(active_children()))

Если вы не используете AsyncResult или обратный вызов, тогда вы должны просто использовать

map(thread_handler, thread_params)

вместо

map_async(thread_handler, thread_params)

, потому что он все равно работает параллельно и даже блокирует, пока не выполнит заданную задачу.

Протокол менеджера контекста также работает с Пулом:

with Pool(NUM_OF_THREADS) as pool:
    thread_params = [(english_dict, lang_code) for lang_code in sorted(LANG_CODES) if not lang_code.split('-')[0] in VERIFIED_LANGUAGES]
    pool.map(thread_handler, thread_params)
print('DONE GENERATING')
...