Преобразование обхода графа в многопроцессорность в Python - PullRequest
2 голосов
/ 12 августа 2010

Я работал над алгоритмом обхода графа в простой сети, и я хотел бы запустить его с использованием многопроцессорной обработки, поскольку при масштабировании по всей сети потребуется много вызовов, ограниченных вводом / выводом.Простая версия работает довольно быстро:

already_seen = {}
already_seen_get = already_seen.get

GH_add_node = GH.add_node
GH_add_edge = GH.add_edge
GH_has_node = GH.has_node
GH_has_edge = GH.has_edge


def graph_user(user, depth=0):
    logger.debug("Searching for %s", user)
    logger.debug("At depth %d", depth)
    users_to_read = followers = following = []

    if already_seen_get(user):
        logging.debug("Already seen %s", user)
        return None

    result = [x.value for x in list(view[user])]

    if result:
        result = result[0]
        following = result['following']
        followers = result['followers']
        users_to_read = set().union(following, followers)

    if not GH_has_node(user):
        logger.debug("Adding %s to graph", user)
        GH_add_node(user)

    for follower in users_to_read:
        if not GH_has_node(follower):
            GH_add_node(follower)
            logger.debug("Adding %s to graph", follower)
            if depth < max_depth:
                graph_user(follower, depth + 1)

        if GH_has_edge(follower, user):
            GH[follower][user]['weight'] += 1
        else:
            GH_add_edge(user, follower, {'weight': 1})

Это на самом деле значительно быстрее, чем моя многопроцессорная версия:

to_write = Queue()
to_read = Queue()
to_edge = Queue()
already_seen = Queue()


def fetch_user():
    seen = {}
    read_get = to_read.get
    read_put = to_read.put
    write_put = to_write.put
    edge_put = to_edge.put
    seen_get = seen.get

    while True:
        try:
            logging.debug("Begging for a user")

            user = read_get(timeout=1)
            if seen_get(user):
                continue

            logging.debug("Adding %s", user)
            seen[user] = True
            result = [x.value for x in list(view[user])]
            write_put(user, timeout=1)

            if result:
                result = result.pop()
                logging.debug("Got user %s and result %s", user, result)
                following = result['following']
                followers = result['followers']
                users_to_read = list(set().union(following, followers))

                [edge_put((user, x, {'weight': 1})) for x in users_to_read]

                [read_put(y, timeout=1) for y in users_to_read if not seen_get(y)]

        except Empty:
            logging.debug("Fetches complete")
            return


def write_node():
    users = []
    users_app = users.append
    write_get = to_write.get

    while True:
        try:
            user = write_get(timeout=1)
            logging.debug("Writing user %s", user)
            users_app(user)
        except Empty:
            logging.debug("Users complete")
            return users


def write_edge():
    edges = []
    edges_app = edges.append
    edge_get = to_edge.get

    while True:
        try:
            edge = edge_get(timeout=1)
            logging.debug("Writing edge %s", edge)
            edges_app(edge)
        except Empty:
            logging.debug("Edges Complete")
            return edges


if __name__ == '__main__':
    pool = Pool(processes=1)
    to_read.put(me)

    pool.apply_async(fetch_user)
    users = pool.apply_async(write_node)
    edges = pool.apply_async(write_edge)

    GH.add_weighted_edges_from(edges.get())
    GH.add_nodes_from(users.get())

    pool.close()
    pool.join()

Что я не могу понять, так это то, почему версия с одним процессом так многоБыстрее.Теоретически, многопроцессорная версия должна писать и читать одновременно.Я подозреваю, что в очередях есть конфликт блокировки, и это является причиной замедления, но у меня нет никаких доказательств этого.Когда я масштабирую количество процессов fetch_user, кажется, что он работает быстрее, но тогда у меня возникают проблемы с синхронизацией данных, видимых между ними.Вот некоторые мысли, которые у меня были:

  • Это даже хорошее приложение для многопроцессорной обработки?Первоначально я использовал его, потому что хотел иметь возможность извлекать данные из БД в Parallell.
  • Как избежать конфликта ресурсов при чтении и записи из одной и той же очереди?
  • Я пропустил некоторыеочевидное предостережение для дизайна?
  • Что я могу сделать, чтобы разделить таблицу поиска между читателями, чтобы я не продолжал выбирать одного и того же пользователя дважды?
  • При увеличении количества процессов выборки ониписатели в итоге запираются.Похоже, что очередь записи не записывается, но очередь чтения заполнена.Есть ли лучший способ справиться с этой ситуацией, чем с таймаутами и обработкой исключений?

1 Ответ

1 голос
/ 16 августа 2010

Очереди в Python синхронизируются.Это означает, что только один поток может одновременно выполнять чтение / запись, что определенно вызовет узкое место в вашем приложении.

Одним из лучших решений является распределение обработки на основе хеш-функции назначить обработку потокам с помощью простой операции модуля.Например, если у вас 4 потока, у вас может быть 4 очереди:

 thread_queues = []
 for i in range(4):
     thread_queues = Queue()

 for user in user_list:
    user_hash=hash(user.user_id) #hash in here is just shortcut to some standard hash utility 
    thread_id = user_hash % 4
    thread_queues[thread_id].put(user)

 # From here ... your pool of threads access thread_queues but each thread ONLY accesses 
 # one queue based on a numeric id given to each of them.

Большинство хеш-функций будут равномерно распределять ваши данные.Я обычно использую UMAC.Но, может быть, вы можете просто попробовать хеш-функцию из реализации Python String.

Еще одним улучшением было бы избегание использования очередей и использование несинхронизируемого объекта, такого как список.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...