Я ищу любое предложение о том, как устранить узкое место, описанное ниже.
В распределенной инфраструктуре я сопоставляю некоторые варианты будущего и получаю результаты, когда они будут готовы. После получения я должен вызвать трудоемкую, блокирующую функцию "pandas" и, к сожалению, эту функцию нельзя избежать. Оптимальным было бы иметь что-то, что позволило бы мне создать другой процесс, отделенный от for для l oop, который мог бы глотать поток результатов. Для других ограничений, не представленных в этом примере, выходные данные не могут быть сериализованы и отправлены рабочим, и их необходимо обработать на ведущем устройстве.
здесь небольшой макет. Просто поймите идею и не сосредотачивайтесь слишком на деталях кода.
class pxldrl(object):
def __init__(self, df):
self.table = df
def simulation(list_param):
time.sleep(random.random())
val = sum(list_param)/4
if val < 0.5:
result = {'param_e': val}
else:
result = {'param_f': val}
return pxldrl(result)
def costly_function(result, output):
time.sleep(1)
# blocking pandas function
output = output.append(result.table, sort=False, ignore_index=True)
return output
def main():
client = Client(n_workers=4, threads_per_worker=1)
output = pd.DataFrame(columns=['param_e', 'param_f'])
input = pd.DataFrame(np.random.random(size=(100, 4)),
columns=['param_a', 'param_b', 'param_c', 'param_d'])
for i in range(2):
futures = client.map(simulation, input.values)
for future, result in as_completed(futures, with_results=True):
output = costly_function(result, output)