python фильтр + многопроцессорность + итератор ленивая загрузка - PullRequest
0 голосов
/ 12 февраля 2020

У меня есть двумерный массив, который создает огромный (> 300 ГБ) список комбинаций, поэтому я хотел бы выполнить ленивую итерацию на итераторе, созданном itertools.combination, и распараллелить эту операцию. Проблема в том, что мне нужно отфильтровать вывод, а это не поддерживается многопроцессорностью. Мой существующий обходной путь для этого требует загрузки списка комбинаций в память, что также не работает из-за размера списка.


n_nodes = np.random.randn(10, 100)
cutoff=0.3

def node_combinations(nodes):
    return itertools.combinations(list(range(len(nodes))), 2)    

def pfilter(func, candidates):
    return np.asarray([c for c, keep in zip(candidates, pool.map(func, candidates)) if keep])

def pearsonr(xy: tuple):
    correlation_coefficient = scipy.stats.pearsonr(n_nodes[xy[0]], n_nodes[xy[1]])[0]
    if correlation_coefficient >= cutoff:
            return True
        else:
            return False


edgelist = pfilter(pearsonr, node_combinations(n_nodes))

Я ищу способ выполнить ленивую оценку большого итератора, используя многопроцессорную обработку с фильтром вместо отображения.

Ответы [ 2 ]

0 голосов
/ 12 февраля 2020

Предложение Ходжи прекрасно работает - спасибо!

@ Дэн, проблема в том, что даже пустые списки занимают память, а х42 млрд пар - почти 3 ТБ в памяти.

вот моя реализация:

import more_itertools
import itertools
import multiprocessing as mp
import numpy as np
import scipy
from tqdm import tqdm

n_nodes = np.random.randn(10, 100)
num_combinations = int((int(n_nodes.shape[0]) ** 2) - int(n_nodes.shape[0]) // 2)
cpu_count = 8
cutoff=0.3

def node_combinations(nodes):
    return itertools.combinations(list(range(len(nodes))), 2)    

def edge_gen(xy_iterator: type(itertools.islice)):
    edges = []
    for cand in tqdm(xy_iterator, total=num_combinations//cpu_count)
        if pearsonr(cand):
            edges.append(cand)

def pearsonr(xy: tuple):
    correlation_coefficient = scipy.stats.pearsonr(n_nodes[xy[0]], n_nodes[xy[1]])[0]
    if correlation_coefficient >= cutoff:
            return True
        else:
            return False


slices = more_itertools.distribute(cpu_count), node_combinations(n_nodes))
pool = mp.Pool(cpu_count)
results = pool.imap(edge_gen, slices)
pool.close()
pool.join()

0 голосов
/ 12 февраля 2020

В следующем примере используется семафор для замедления чрезмерного потока пула. Не правильное решение, так как оно не решает другие проблемы, такие как то, что вложенные циклы, которые используют тот же пул и l oop над результатом imap, имеют свои внешние задания l oop, заканчивающиеся sh до любого из работа с внутренними циклами даже начинается. Но это ограничивает использование памяти:

def slowdown(n=16):
    s = threading.Semaphore(n)
    def inner(it):
        for item in it:
            s.acquire()
            yield item
    def outer(it):
        for item in it:
            s.release()
            yield item
    return outer, inner

Это используется для переноса pool.imap как такового:

outer, inner = slowdown()
outer(pool.imap(func, inner(candidates)))
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...