Как использовать многопроцессорность для удаления дубликатов в очень большом списке? - PullRequest
4 голосов
/ 16 января 2020

Допустим, у меня есть огромный список, содержащий случайные числа, например

L = [random.randrange(0,25000000000) for _ in range(1000000000)]

Мне нужно избавиться от дубликатов в этом списке

Я написал этот код для списков, содержащих меньшие число элементов

def remove_duplicates(list_to_deduplicate):
seen = set()
result=[]
for i in list_to_deduplicate:
    if i not in seen:
        result.append(i)
        seen.add(i)
return result

В приведенном выше коде я создаю набор, чтобы я мог запомнить, какие числа уже появились в списке, над которым я работаю, если число не входит в набор, то я добавляю его в список результатов, который мне нужно вернуть и сохранить в наборе, чтобы он больше не добавлялся в список результатов

Теперь для 1000000 номеров в списке все хорошо, я могу быстро получить результат, но для чисел Превосходя, скажем, 1000000000 проблем, мне нужно использовать разные ядра на моей машине, чтобы попытаться устранить проблему, а затем объединить результаты нескольких процессов

Моим первым предположением было сделать набор доступным для всех процессов. но возникнет много сложностей. Как процесс может читать, а может быть, другой добавляет в набор, и я даже не знаю, возможно ли это чтобы разделить набор между процессами, я знаю, что мы можем использовать Очередь или канал, но я не уверен, как его использовать

Может кто-нибудь дать мне совет о том, как лучше всего решить эту проблему? Я открыт для любой новой идеи

Ответы [ 2 ]

3 голосов
/ 16 января 2020

Я скептически c даже ваш самый большой список достаточно велик, чтобы многопроцессорная обработка могла улучшить время. Использование numpy и многопоточности , вероятно, является вашим лучшим шансом.

Многопроцессорная обработка приводит к некоторым накладным расходам и увеличивает потребление памяти, как @Frank Merrow, как справедливо упоминалось ранее. Однако это не относится к многопоточности. Важно не смешивать эти термины, потому что процессы и потоки не совпадают. Потоки в одном и том же процессе совместно используют свою память, а разные процессы - нет.

Проблема с многоядерностью в Python заключается в GIL , который не допускает использование нескольких потоков (в тот же процесс) для параллельного выполнения Python байт-кода. Некоторые расширения C, например numpy, могут высвобождать GIL, что позволяет использовать многоядерный параллелизм с многопоточностью. Вот ваш шанс ускорить значительное улучшение, просто используя numpy.

from multiprocessing.dummy import Pool  # .dummy uses threads
import numpy as np

r = np.random.RandomState(42).randint(0, 25000000000, 100_000_000)
n_threads = 8

result = np.unique(np.concatenate(
    Pool(n_threads).map(np.unique, np.array_split(r, n_threads)))
).tolist()

Используйте numpy и пул потоков, разделите массив, создайте подмассивы уникальны в отдельных потоках, затем объединяют вложенные массивы и снова делают рекомбинированный массив еще более уникальным. Окончательное отбрасывание дубликатов для рекомбинированного массива необходимо, поскольку в под-массивах можно идентифицировать только локальные дубликаты.

Для данные с низкой энтропией (много дубликатов) использование pandas.unique вместо numpy.unique может быть намного быстрее. В отличие от numpy.unique он также сохраняет порядок появления.

Обратите внимание, что использование пула потоков, как описано выше, имеет смысл только в том случае, если функция numpy имеет значение , но еще не пронизывал под капотом , вызывая низкоуровневые математические библиотеки. Поэтому всегда проверяйте, действительно ли это улучшает производительность, и не принимайте это как должное.


Протестировано с 100М случайными числами в диапазоне:

  • Высокая энтропия : 0 - 25_000_000_000 (дубликаты 199560)
  • Низкая энтропия: 0 - 1000

Код

import time
import timeit
from multiprocessing.dummy import Pool  # .dummy uses threads

import numpy as np
import pandas as pd


def time_stmt(stmt, title=None):
    t = timeit.repeat(
        stmt=stmt,
        timer=time.perf_counter_ns, repeat=3, number=1, globals=globals()
    )
    print(f"\t{title or stmt}")
    print(f"\t\t{min(t) / 1e9:.2f} s")


if __name__ == '__main__':

    n_threads = 8  # machine with 8 cores (4 physical cores)

    stmt_np_unique_pool = \
"""
np.unique(np.concatenate(
    Pool(n_threads).map(np.unique, np.array_split(r, n_threads)))
).tolist()
"""

    stmt_pd_unique_pool = \
"""
pd.unique(np.concatenate(
    Pool(n_threads).map(pd.unique, np.array_split(r, n_threads)))
).tolist()
"""
    # -------------------------------------------------------------------------

    print(f"\nhigh entropy (few duplicates) {'-' * 30}\n")
    r = np.random.RandomState(42).randint(0, 25000000000, 100_000_000)

    r = list(r)
    time_stmt("list(set(r))")

    r = np.asarray(r)
    # numpy.unique
    time_stmt("np.unique(r).tolist()")
    # pandas.unique
    time_stmt("pd.unique(r).tolist()")    
    # numpy.unique & Pool
    time_stmt(stmt_np_unique_pool, "numpy.unique() & Pool")
    # pandas.unique & Pool
    time_stmt(stmt_pd_unique_pool, "pandas.unique() & Pool")

    # ---
    print(f"\nlow entropy (many duplicates) {'-' * 30}\n")
    r = np.random.RandomState(42).randint(0, 1000, 100_000_000)

    r = list(r)
    time_stmt("list(set(r))")

    r = np.asarray(r)
    # numpy.unique
    time_stmt("np.unique(r).tolist()")
    # pandas.unique
    time_stmt("pd.unique(r).tolist()")
    # numpy.unique & Pool
    time_stmt(stmt_np_unique_pool, "numpy.unique() & Pool")
    # pandas.unique() & Pool
    time_stmt(stmt_pd_unique_pool, "pandas.unique() & Pool")

Как вы можете видеть ниже, просто используя numpy без многопоточности уже составляет наибольшее улучшение производительности. Также обратите внимание, что pandas.unique() быстрее, чем numpy.unique() (только) для многих дубликатов.

high entropy (few duplicates) ------------------------------

    list(set(r))
        32.76 s
    np.unique(r).tolist()
        12.32 s
    pd.unique(r).tolist()
        23.01 s
    numpy.unique() & Pool
        9.75 s
    pandas.unique() & Pool
        28.91 s

low entropy (many duplicates) ------------------------------

    list(set(r))
        5.66 s
    np.unique(r).tolist()
        4.59 s
    pd.unique(r).tolist()
        0.75 s
    numpy.unique() & Pool
        1.17 s
    pandas.unique() & Pool
        0.19 s
0 голосов
/ 16 января 2020

Не могу сказать, что мне это нравится, но это должно сработать по моде.

Разделить данные на N только для чтения. Распределите по одному на каждого работника для исследования данных. Все доступно только для чтения, поэтому всем можно поделиться. Каждый работник i 1 ... N сверяет свой список со всеми другими «будущими» списками i + 1 ... N

Каждый работник i поддерживает битовую таблицу для своих i + 1 ... N списков, отмечая, попадет ли какой-либо из ее элементов в какой-либо из будущих элементов.

Когда все готово, рабочий i отправляет свою таблицу битов обратно мастеру, где tit может быть AND ed. нули затем удаляются. Нет сортировки нет наборов. Проверка не быстрая, хотя.

Если вы не хотите беспокоиться о множественных битовых таблицах, вы можете позволить каждому работнику i записывать нули, когда они находят дубликат выше своей области ответственности. ОДНАКО, теперь вы столкнулись с реальными проблемами с общей памятью . В этом отношении, вы могли бы даже позволить каждой работе просто удалить dup над своим регионом, но то же самое.

Даже разделение работы напрашивается на вопрос. Каждый работник обходится дорого, хотя список всех остальных для каждой своей записи. * (N-1) LEN (область) / 2 . Каждый работник может создать набор своего региона или отсортировать свой регион. Любой из них разрешил бы более быстрые проверки, но затраты складываются.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...