Python - Как использовать MongoDB (pymon go) и мультипроцессинг без «MongoClient, открытого перед форком». выдавать? - PullRequest
0 голосов
/ 27 апреля 2020

Я использую многопроцессорную обработку, но получаю сообщение об ошибке «MongoClient открыт перед форком». для каждого процесса. Я провел небольшое исследование и пришел к выводу, что сейчас я создаю несколько клиентов MongoClient (по одному на подпроцесс). Но я не нашел реального решения. Каждый процесс использует подключение MongoDB (я использую Pymon go в качестве разъема). Кто-нибудь может мне помочь?

Код:

def func1():
    while True:
        col1.insert_one({...})
        ...

def func2():
    while True:
        col2.insert_one({...})
        ...

if __name__ == "__main__":
    # MongoDB
    myclient = pymongo.MongoClient("mongodb://localhost:27017/")
    mydb = myclient["testdb"]
    col1 = mydb["col1"]
    col2 = mydb["col2"]

    # Multiproccesing
    p1 = Process(target=func1)
    p2 = Process(target=func2)
    p1.start()
    p2.start()
    p1.join()
    p2.join()

1 Ответ

1 голос
/ 27 апреля 2020

Каждый процесс должен открыть свои собственные подключения к MongoDB.

Обратите внимание на предупреждение в get_mongo_client(); если вы хотите, чтобы откуда-то было безопасно вызывать, вам нужно «пометить» _mongo_client PID текущего процесса и сбросить объект, если у него неправильный PID.

_mongo_client = None  # Global per process


def get_mongo_client():
    # Make sure not to call this within the master process, or things
    # will break again.
    global _mongo_client
    if _mongo_client is None:
        _mongo_client = pymongo.MongoClient("mongodb://localhost:27017/")
    return _mongo_client


def get_mongo_col(collection, database="testdb"):
    client = get_mongo_client()
    return client[database][collection]


def func1():
    col1 = get_mongo_col("col1")
    while True:
        col1.insert_one({})
        # ...


def func2():
    col2 = get_mongo_col("col2")
    while True:
        col2.insert_one({})
        # ...


def main():
    # Multiproccesing
    p1 = Process(target=func1)
    p2 = Process(target=func2)
    p1.start()
    p2.start()
    p1.join()
    p2.join()


if __name__ == "__main__":
    main()
...