Как остановить мои процессы на холостом ходу или быть убитым? - PullRequest
3 голосов
/ 06 мая 2019

Мне нужно обработать миллионы пользователей.У меня есть миллионы user_ids, я получаю пользовательские данные из http-запроса и записываю в файл.

Я использую многопроцессорную обработку для выполнения этой задачи.Затем я использую многопоточность в каждом процессе, чтобы выполнить задачу в пакете.Это значительно повышает производительность и позволяет мне обрабатывать больше пользователей с более высокой скоростью.

Проблема :

Я нахожу через определенное время все процессыстановится неактивным.Я знаю это, глядя на монитор активности.В начале я вижу, что они используют много процессоров и имеют потоки, через некоторое время они кажутся бездействующими, и моя программа зависает.

import os
import time
import logging
import multiprocessing
import config
import json
from google.cloud import storage
from pymongo import MongoClient, UpdateOne
from queue import Queue
import threading
from multiprocessing import Pool, cpu_count

PROCESSES = multiprocessing.cpu_count() - 1

def get_tweet_objects(user, counter, lock, proc):

   # Removed ( calls a http request and writes json file to disk

    lock.acquire()
      try:
        counter.value = counter.value + 1
      finally:
        lock.release()

    print("APP ID: {app_id}, REMAINING: {app_remaining}, TOTAL USERS: {total_users}, USER: {user_id}, NO OF TWEETS: {no_tweets}, TIME TAKEN: {time_taken}"
          .format(app_id=app.APP_ID, app_remaining=0, total_users=counter.value, user_id=user["user_id"], no_tweets=len(total_tweets), time_taken=round((end - start), 2)), threading.current_thread().name, proc)


def add_tasks(task_queue, tasks):

    for task in tasks:
        task_queue.put(task)

    return task_queue


def process_tasks(task_queue, counter, lock):

    logger = multiprocessing.get_logger()
    proc = os.getpid()
    while not task_queue.empty():
        try:
            user = task_queue.get()
            do_multithreading(user, counter, lock, proc)

        except Exception as e:
            logger.error(e)
        logger.info(f'Process {proc} completed successfully')
    return True


def manage_queue(task_queue, counter, lock, proc):

    while True:
        user = task_queue.get()
        get_tweet_objects(user, counter, lock, proc)
        task_queue.task_done()


def do_multithreading(batches, counter, lock, proc):
    """Starts the multithreading"""

    # Set the number of threads.
    number_of_threads = 5

    # Initializes the queue.
    task_queue = Queue()

    # Starts the multithreading
    for i in range(number_of_threads):
        t = threading.Thread(target=manage_queue, args=[
                             task_queue, counter, lock, proc])
        t.daemon = True
        t.start()

    for batch in batches:
        task_queue.put(batch)
    task_queue.join()


def run():

    mongodb = MongoClient(host=config.MONGO_URI)["twitter"]

    existing_users = mongodb[SCREEN_NAME].find({}).limit(10000)

    batches = create_batches_of_100(existing_users)

    empty_task_queue = multiprocessing.Manager().Queue()
    full_task_queue = add_tasks(empty_task_queue, batches)
    processes = []

    counter = multiprocessing.Value('i', 0)
    lock = multiprocessing.Lock()

    print(f'Running with {PROCESSES} processes!')
    start = time.time()
    for w in range(PROCESSES):
        p = multiprocessing.Process(
            target=process_tasks, args=(full_task_queue, counter, lock))
        processes.append(p)
        p.start()
    for p in processes:
        p.join()
    print(f'Time taken = {time.time() - start:.10f}')


if __name__ == '__main__':
    multiprocessing.log_to_stderr(logging.ERROR)
    run()

1 Ответ

4 голосов
/ 06 мая 2019

Таким образом, есть несколько проблем с кодом.Прежде всего избегайте бесконечных циклов любой ценой, как в функции manage_queue.Примечание: я не имею в виду «избегать while True:», потому что это не означает, что это бесконечный цикл (например, вы можете иметь break внутри него).

С учетом сказанногоСамая большая проблема (которую мы обнаружили в длительном обсуждении в чате) состоит в том, что функция get_tweet_object() иногда завершается сбоем с исключением, и когда это происходит, task_queue.task_done() никогда не вызывается и, следовательно, task_queue.join() никогда не завершается.

Другойпроблема в том, что смешивание while not task_queue.empty(): с task_queue.get() является условием гонки.Что происходит, когда запускаются два параллельных потока и task_queue имеет ровно 1 элемент?Один из них будет висеть вечно.Это должно быть заменено на task_queue.get(False) с соответствующей queue.Empty ловлей.Это похоже на косметику, но дело в том, что состояние гонки рассматривается в вызове .get().При этом вам также необходимо заполнить очередь перед порождением потоков.

В общем, здесь есть изменения:

from queue import Empty

def do_multithreading(batches, counter, lock, proc):
    """Starts the multithreading"""

    # Set the number of threads.
    number_of_threads = 5

    # Initializes the queue.
    for batch in batches:
        task_queue.put(batch)

    # Starts the multithreading
    for i in range(number_of_threads):
        t = threading.Thread(target=manage_queue, args=[
                             task_queue, counter, lock, proc])
        t.daemon = True
        t.start()
    task_queue.join()

def manage_queue(task_queue, counter, lock, proc):
    while True:
        try:
            user = task_queue.get(False)
        except Empty:
            break

        try:
            get_tweet_objects(user, counter, lock, proc)
        except Exception as exc:
            print(exc)
        finally:
            task_queue.task_done()

def process_tasks(task_queue, counter, lock):
    logger = multiprocessing.get_logger()
    proc = os.getpid()
    while True:
        try:
            user = task_queue.get(False)
        except Empty:
            break
        try:
            do_multithreading(user, counter, lock, proc)
        except Exception as e:
            logger.error(e)
        logger.info(f'Process {proc} completed successfully')
    return True

При этом я настоятельно рекомендую использовать исполнителей процессов / потоков.

...