Почему каждый поток получает новую копию объекта packageManager? Как я могу использовать только один? - PullRequest
0 голосов
/ 08 июля 2019

Я строю распределенный паук в Python, есть проблема с кодом. Я создал объект packagemanager снаружи и передал его в gensismanager.py, и я создал 4 потока на основе 4 функций в gensismanager. но когда каждый из них вызывает self.packagemanaer, он вызывает другой объект, он получает свою копию этого. Зачем? Как я могу поделиться только одной копией этого? Спасибо!

в GensisManamger.py

class GensisManager(object):
    host_ip = '192.168.1.118'
    host_port = 10001
    authkey = 'pathea'

    def __init__(self, service, packageManager):
        self.logManager = LogManager(service)
        self.packageFactory = PackageFactory()
        self.service = service
        self.dataWrapper = DataWrapper()
        self.cnames = DataTypeNames()
        self.analyzeManager = AnalyzeManager(self.service)
        self.packageManager = packageManager
        print('init pack ', self.packageManager)

    def start_gensisManager_master(self, task_queue, result_queue):
        def _get_task_queue():
            return task_queue
        def _get_result_queue():
            return result_queue

        BaseManager.register('get_task_queue', callable = _get_task_queue)
        BaseManager.register('get_result_queue', callable = _get_result_queue)
        # BaseManager.register('_get_ownership_queue', callable = _get_ownership_result_queue)

        manager = BaseManager(address = (self.host_ip, self.host_port), authkey = self.authkey.encode('utf-8'))
        print('[GS] Usernet service created.')
        return manager

    def setStatTool(self, statTool):
        self.statTool = statTool

    def package_manager_process_distribute(self, datas, task_queue, wait_queue):
        # check logger for breakpoint(new packages)
        if datas is None:
            sys.exit('[GS] Error: no packages to start.')

        initData = datas[self.cnames.data]
        initStat = datas[self.cnames.stat]
        # init packages with found packages or new packages
        self.packageManager.add_new_packages(initData)
        print('aaaaa', self.packageManager)
        if initStat != None:
            self.analyzeManager.initStatFromOldStat(initStat)

        print('[GS] Package distributor thread running.......')
        while True:
            time.sleep(1)

            while self.packageManager.has_new_package():
                # get new package from manager and send it to task queue awaiting for slaves
    #            new_package = packageManager.get_new_package()
                # NEED PACKAGE GENERATOR
                # PUT IN A PACKAGE
    #            task_queue.put(new_package)
                # add work end conditions. end code not completed
                break

            while not wait_queue.empty():
                print('[GS] Added a failed package to task queue.')
                package = wait_queue.get()
                self.packageManager.remove_visited_package(package)

    def package_manager_process_collect(self, conn_queue):
        print('[GS] Package collector thread running.......')
        print('ccc', self.packageManager)
        while True:
            while not conn_queue.empty():
                try:
                    datas = conn_queue.get()
                    package_manager.add_new_packages(datas)
                except BaseException as e:
                    time.sleep(0.5)

в Gensis.py

packageManager = PackageManager(service)
    print('ssss', packageManager)
    manager = GensisManager(service, packageManager)



datas = manager.loadInitData()
package_manager_process_distribute = Process(target = manager.package_manager_process_distribute, args = (datas, task_queue, wait_queue,))
package_manager_process_collect = Process(target = manager.package_manager_process_collect, args = (conn_queue,))
result_analysis_process = Process(target = manager.result_analysis_process, args = (result_queue, conn_queue, store_queue, wait_queue,))
store_process = Process(target = manager.store_process, args = (store_queue,))

package_manager_process_distribute.start()
package_manager_process_collect.start()
result_analysis_process.start()
store_process.start()

package_manager_process_distribute.join()
package_manager_process_collect.join()
result_analysis_process.join()
store_process.join()
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...