Неумелое управление темными будущими результатами замедляет выступления - PullRequest
0 голосов
/ 24 января 2020

Я ищу любое предложение о том, как устранить узкое место, описанное ниже.

В распределенной инфраструктуре я сопоставляю некоторые варианты будущего и получаю результаты, когда они будут готовы. После получения я должен вызвать трудоемкую, блокирующую функцию "pandas" и, к сожалению, эту функцию нельзя избежать. Оптимальным было бы иметь что-то, что позволило бы мне создать другой процесс, отделенный от for для l oop, который мог бы глотать поток результатов. Для других ограничений, не представленных в этом примере, выходные данные не могут быть сериализованы и отправлены рабочим, и их необходимо обработать на ведущем устройстве.

здесь небольшой макет. Просто поймите идею и не сосредотачивайтесь слишком на деталях кода.

class pxldrl(object):
    def __init__(self, df):
        self.table = df

def simulation(list_param):
    time.sleep(random.random())
    val = sum(list_param)/4
    if val < 0.5:
        result = {'param_e': val}
    else:
        result = {'param_f': val}
    return pxldrl(result)

def costly_function(result, output):
    time.sleep(1)
    # blocking pandas function 
    output = output.append(result.table, sort=False, ignore_index=True)

    return output

def main():
    client = Client(n_workers=4, threads_per_worker=1)

    output = pd.DataFrame(columns=['param_e', 'param_f'])

    input = pd.DataFrame(np.random.random(size=(100, 4)),
                                columns=['param_a', 'param_b', 'param_c', 'param_d'])

    for i in range(2):

        futures = client.map(simulation, input.values)

        for future, result in as_completed(futures, with_results=True):
            output = costly_function(result, output)

1 Ответ

0 голосов
/ 25 января 2020

Звучит так, как будто вы хотите запустить costly_function в отдельном потоке. Возможно, вы могли бы использовать модуль threading или concurrent.futures для запуска всей вашей подпрограммы в отдельном потоке?

Если вы хотите получить фантазию, вы можете даже снова использовать Dask и создать второго клиента, который запускается внутри этот процесс:

local_client = Client(processes=False)

и используйте это. (хотя вы должны быть осторожны при смешивании фьючерсов между клиентами, которые не будут работать)

...