Мне нужно обработать миллионы пользователей.У меня есть миллионы user_ids, я получаю пользовательские данные из http-запроса и записываю в файл.
Я использую многопроцессорную обработку для выполнения этой задачи.Затем я использую многопоточность в каждом процессе, чтобы выполнить задачу в пакете.Это значительно повышает производительность и позволяет мне обрабатывать больше пользователей с более высокой скоростью.
Проблема :
Я нахожу через определенное время все процессыстановится неактивным.Я знаю это, глядя на монитор активности.В начале я вижу, что они используют много процессоров и имеют потоки, через некоторое время они кажутся бездействующими, и моя программа зависает.
import os
import time
import logging
import multiprocessing
import config
import json
from google.cloud import storage
from pymongo import MongoClient, UpdateOne
from queue import Queue
import threading
from multiprocessing import Pool, cpu_count
PROCESSES = multiprocessing.cpu_count() - 1
def get_tweet_objects(user, counter, lock, proc):
# Removed ( calls a http request and writes json file to disk
lock.acquire()
try:
counter.value = counter.value + 1
finally:
lock.release()
print("APP ID: {app_id}, REMAINING: {app_remaining}, TOTAL USERS: {total_users}, USER: {user_id}, NO OF TWEETS: {no_tweets}, TIME TAKEN: {time_taken}"
.format(app_id=app.APP_ID, app_remaining=0, total_users=counter.value, user_id=user["user_id"], no_tweets=len(total_tweets), time_taken=round((end - start), 2)), threading.current_thread().name, proc)
def add_tasks(task_queue, tasks):
for task in tasks:
task_queue.put(task)
return task_queue
def process_tasks(task_queue, counter, lock):
logger = multiprocessing.get_logger()
proc = os.getpid()
while not task_queue.empty():
try:
user = task_queue.get()
do_multithreading(user, counter, lock, proc)
except Exception as e:
logger.error(e)
logger.info(f'Process {proc} completed successfully')
return True
def manage_queue(task_queue, counter, lock, proc):
while True:
user = task_queue.get()
get_tweet_objects(user, counter, lock, proc)
task_queue.task_done()
def do_multithreading(batches, counter, lock, proc):
"""Starts the multithreading"""
# Set the number of threads.
number_of_threads = 5
# Initializes the queue.
task_queue = Queue()
# Starts the multithreading
for i in range(number_of_threads):
t = threading.Thread(target=manage_queue, args=[
task_queue, counter, lock, proc])
t.daemon = True
t.start()
for batch in batches:
task_queue.put(batch)
task_queue.join()
def run():
mongodb = MongoClient(host=config.MONGO_URI)["twitter"]
existing_users = mongodb[SCREEN_NAME].find({}).limit(10000)
batches = create_batches_of_100(existing_users)
empty_task_queue = multiprocessing.Manager().Queue()
full_task_queue = add_tasks(empty_task_queue, batches)
processes = []
counter = multiprocessing.Value('i', 0)
lock = multiprocessing.Lock()
print(f'Running with {PROCESSES} processes!')
start = time.time()
for w in range(PROCESSES):
p = multiprocessing.Process(
target=process_tasks, args=(full_task_queue, counter, lock))
processes.append(p)
p.start()
for p in processes:
p.join()
print(f'Time taken = {time.time() - start:.10f}')
if __name__ == '__main__':
multiprocessing.log_to_stderr(logging.ERROR)
run()