Python - Добавление заданий в многопроцессорный пул по частям - PullRequest
0 голосов
/ 20 января 2020

В настоящее время мы добавляем слишком много заданий в pool и начинаем их выполнять в многопроцессорных системах. Однако мы сталкиваемся с проблемой памяти из-за большого количества параметров оптимизации и очень большого набора данных. Набор данных равен 110 MB до вычислений (он увеличивается), и в настоящее время мы используем более 3 миллионов возможных комбинаций параметров. Это потребляет все доступные RAM (у нас 128 ГБ ОЗУ) и процессы останавливаются на каком-то уровне.

Возможно ли добавлять задания в пул по частям, чтобы выполненные задания удалялись из ОЗУ? Еще один вариант - постоянно проверять пул, и, если в пуле ничего нет, добавьте, скажем, 1000 новых заданий.

Возможно ли это?

Это способ добавления заданий в пул. и выполните их:

def loop(self, parameters, parameter_index, pool, execute_method):
    parameter = list(parameters.values())[parameter_index]

    value = parameter["start_value"]

    temp_value = Decimal(str(value))
    while temp_value <= Decimal(str(parameter["end_value"])):
        parameter["current_loop_value"] = value
        value += parameter["step_size"]
        temp_value = temp_value + Decimal(str(parameter["step_size"]))

        if parameter_index + 1 < len(parameters):
            self.loop(parameters, parameter_index + 1, pool, execute_method)
            continue

        self.thread_count += 1

        current_parameter = copy.deepcopy(parameters)
        current_parameter["index"] = self.thread_count
        execute_method(pool, current_parameter)

        if self.thread_count % self.thread_count_for_memory_check == 0:
            while True:
                if psutil.virtual_memory().percent < self.memory_leak_threshold:
                    break
                time.sleep(self.memory_leak_sleep_time)
                print("sleeping for {} seconds.... ".format(self.memory_leak_sleep_time), psutil.virtual_memory())

def execute_in_pool(self, pool, parameters):
    pool.apply_async(self.execute_strategy, args=(self.dataset_dict, parameters, self.thread_count),
                     callback=self.thread_callback)
...