Не могу выбрать объекты psycopg2.extensions.connection при использовании pool.imap, но может быть сделано в отдельных процессах - PullRequest
1 голос
/ 12 апреля 2020

Я пытаюсь создать приложение, которое будет «проверять» ячейку, представляющую собой квадрат, покрывающий часть земли в географической базе данных c, и выполнять анализ объектов в этой ячейке. Поскольку у меня есть много ячеек для обработки, я использую многопроцессорный подход.

Я работал в моем объекте примерно так:

class DistributedGeographicConstraintProcessor:

    ...

    def _process_cell(self, conn_string):

        conn = pg2.connect(conn_string)
        try:
            cur = conn.cursor()

            cell_id = self._check_out_cell(cur)
            conn.commit()
            print(f"processing cell_id {cell_id}...")

            for constraint in self.constraints:
                # print(f"processing {constraint.name()}...")
                query = constraint.prepare_distributed_query(self.job, self.grid)
                cur.execute(query, {
                    "buffer": constraint.buffer(),
                    "cell_id": cell_id,
                    "name": constraint.name(),
                    "simplify_tolerance": constraint.simplify_tolerance()
                })

            # TODO: do a final race condition check to further suppress duplicates
            self._check_in_cell(cur, cell_id)
            conn.commit()

        finally:
            del cur
            conn.close()

        return None

    def run(self):

        while True:
            if not self._job_finished():
                params = [self.conn_string] * self.num_cores
                processes = []
                for param in params:
                    process = mp.Process(target=self._process_cell, args=(param,))
                    processes.append(process)
                    sleep(0.1)  # Prevent multiple processes from checkout out the same grid square
                    process.start()
                for process in processes:
                    process.join()
            else:
                self._finalize_job()
                break

Но проблема в том, что он будет только запустите четыре процесса и дождитесь, пока все они завершат sh, прежде чем запускать четыре новых процесса.

Я хочу сделать так, чтобы, когда один процесс завершил свою работу, он сразу же начал работать над следующей ячейкой, даже если ее ко-процессы еще не завершены.

Я не уверен, как это реализовать, и я пытался использовать пул, подобный этому:

def run(self):

    pool = mp.Pool(self.num_cores)
    unprocessed_cells = self._unprocessed_cells()
    for i in pool.imap(self._process_cell, unprocessed_cells):
        print(i)

Но это просто говорит мне, что соединение невозможно мариновать:

TypeError: can't pickle psycopg2.extensions.connection objects

Но я не понимаю почему, потому что это та же самая функция, которую я использую в функции imap, как и в цели Process.

Я уже посмотрел эти темы, вот почему они не отвечают на мой вопрос:

1 Ответ

1 голос
/ 13 апреля 2020

Я предполагаю, что вы присоединяете какой-либо объект соединения к self; Попробуйте переписать ваше решение, используя только функции (без классов / методов).

Вот упрощенная версия решения для одного производителя / нескольких рабочих , которое я использовал некоторое время go:

def worker(param):
    //connect to pg
    //do work


def main():
    pool = Pool(processes=NUM_PROC)
    tasks = []
    for param in params:
        t = pool.apply_async(utils.process_month, args=(param, ))
        tasks.append(t)
    pool.close()
    finished = false
    while not finished:     
        finished = True
        for t in tasks:
            if not t.ready():
                finished = False
                break
        time.sleep(1)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...