multiprocessing / psycopg2 TypeError: невозможно выбрать объекты _thread.RLock - PullRequest
0 голосов
/ 27 сентября 2018

Я следовал приведенному ниже коду для реализации параллельного запроса на выборку в базе данных postgres:

https://tech.geoblink.com/2017/07/06/parallelizing-queries-in-postgresql-with-python/

Моя основная проблема заключается в том, что у меня ~ 6k запросов, которые должны бытьвыполнено, и я пытаюсь оптимизировать выполнение этих запросов выбора.Первоначально это был один запрос, в котором where id in (...) содержал все 6k идентификаторов предикатов, но у меня возникли проблемы с запросом, использующим более 4 ГБ ОЗУ на компьютере, на котором он выполнялся, поэтому я решил разбить его на 6 000 отдельных запросов, которые, когдасинхронно поддерживает постоянное использование памяти.Однако это занимает намного больше времени, что не является проблемой для моего варианта использования.Несмотря на это, я стараюсь максимально сократить время.

Вот как выглядит мой код:

class PostgresConnector(object):
    def __init__(self, db_url):
        self.db_url = db_url
        self.engine = self.init_connection()
        self.pool = self.init_pool()

    def init_pool(self):
        CPUS = multiprocessing.cpu_count()
        return multiprocessing.Pool(CPUS)

    def init_connection(self):
        LOGGER.info('Creating Postgres engine')
        return create_engine(self.db_url)

    def run_parallel_queries(self, queries):
        results = []
        try:
            for i in self.pool.imap_unordered(self.execute_parallel_query, queries):
                results.append(i)
        except Exception as exception:
            LOGGER.error('Error whilst executing %s queries in parallel: %s', len(queries), exception)
            raise
        finally:
            self.pool.close()
            self.pool.join()

        LOGGER.info('Parallel query ran producing %s sets of results of type: %s', len(results), type(results))

        return list(chain.from_iterable(results))

    def execute_parallel_query(self, query):
        con = psycopg2.connect(self.db_url)
        cur = con.cursor()
        cur.execute(query)
        records = cur.fetchall()
        con.close()

        return list(records)

Однако, когда это выполняется, я получаю следующую ошибку:

TypeError: can't pickle _thread.RLock objects

Я читал много похожих вопросов, касающихся использования многопроцессорных и маринованных объектов, но я не могу на всю жизнь понять, что я делаю неправильно.

Пул обычно одинза процесс (который я считаю наилучшей практикой), но разделяемый на экземпляр класса соединителя, чтобы не создавать пул для каждого использования метода parallel_query.

Главный ответ на аналогичный вопрос:

Доступ к пулу соединений MySQL из многопроцессорной обработки Python

Показывает почти идентичную мою реализацию, за исключением использования MySql вместо Postgres.

Я что-то делаюне так?

Спасибо!

РЕДАКТИРОВАТЬ:

Я нашел этот ответ:

Python Postgres psycopg2 ThreadedConnectionPool исчерпан

, который невероятно детален и выглядит так, как будто я неправильно понял, что дает multiprocessing.Pool против пула соединений, такого как ThreadedConnectionPool.Однако в первой ссылке нет упоминания о необходимости каких-либо пулов соединений и т. Д. Это решение кажется хорошим, но, кажется, много кода для того, что я считаю довольно простой проблемой?

РЕДАКТИРОВАТЬ 2:

Таким образом, приведенная выше ссылка решает другую проблему, с которой я, вероятно, столкнулся бы в любом случае, поэтому я рад, что нашел это, но это не решает первоначальную проблему невозможности использовать imap_unordered вплоть до ошибки травления.Очень расстраивает.

Наконец, я думаю, что, вероятно, стоит отметить, что это выполняется в Heroku, на рабочем динамо, с использованием Redis rq для планирования, фоновых задач и т. Д. И размещенного экземпляра Postgres в качестве базы данных.

1 Ответ

0 голосов
/ 08 октября 2018

Проще говоря, соединение postgres и пул соединений sqlalchemy являются поточно-ориентированными, однако они не являются форк-безопасными.

Если вы хотите использовать многопроцессорность, вы должны инициализировать движок в каждом дочернем процессе после разветвления.

Вместо этого вы должны использовать многопоточность, если вы хотите совместно использовать движки.

См. Безопасность потоков и процессов в документации psycopg2. :

Соединения libpq не должны использоваться разветвленными процессами, поэтому при использовании такого модуля, как многопроцессорная обработка или развертывание разветвленной сетитакой метод, как FastCGI, обязательно создайте соединения после разветвления.

Если вы используете multiprocessing.Pool, есть инициализатор аргумента с ключевым словом, который можно использовать для однократного запуска кода в каждом дочернем процессе.Попробуйте это:

class PostgresConnector(object):
    def __init__(self, db_url):
        self.db_url = db_url
        self.pool = self.init_pool()

    def init_pool(self):
        CPUS = multiprocessing.cpu_count()
        return multiprocessing.Pool(CPUS, initializer=self.init_connection(self.db_url))

    @classmethod
    def init_connection(cls, db_url):
        def _init_connection():
            LOGGER.info('Creating Postgres engine')
            cls.engine = create_engine(db_url)
        return _init_connection

    def run_parallel_queries(self, queries):
        results = []
        try:
            for i in self.pool.imap_unordered(self.execute_parallel_query, queries):
                results.append(i)
        except Exception as exception:
            LOGGER.error('Error whilst executing %s queries in parallel: %s', len(queries), exception)
            raise
        finally:
            pass
            #self.pool.close()
            #self.pool.join()

        LOGGER.info('Parallel query ran producing %s sets of results of type: %s', len(results), type(results))

        return list(chain.from_iterable(results))

    def execute_parallel_query(self, query):
        with self.engine.connect() as conn:
            with conn.begin():
                result = conn.execute(query)
                return result.fetchall()

    def __getstate__(self):
        # this is a hack, if you want to remove this method, you should
        # remove self.pool and just pass pool explicitly
        self_dict = self.__dict__.copy()
        del self_dict['pool']
        return self_dict

Теперь, чтобы решить проблему XY.

Первоначально это был один запрос с идентификатором where в (...), содержащем всеИдентификаторы предикатов 6k, но у меня возникли проблемы с запросом, использующим более 4 ГБ ОЗУ на компьютере, на котором он работал, поэтому я решил разбить его на 6k отдельных запросов, которые синхронно поддерживают постоянное использование памяти.

Вместо этого вы можете захотеть сделать один из следующих вариантов:

  1. написать подзапрос, который генерирует все 6000 идентификаторов, и использовать подзапрос в исходном массовом запросе.
  2. , как указано выше, но запишите подзапрос в виде CTE
  3. , если ваш список идентификаторов приходит из внешнего источника (т.е. не из базы данных), тогда вы можете создатьвременная таблица, содержащая 6000 идентификаторов, а затем выполнить исходный массовый запрос к временной таблице

Однако, если вы настаиваете на запуске 6000 идентификаторов через python, то самый быстрый запрос, скорее всего, не выполнит все 6000 идентификаторов.за один раз (который исчерпает память), ни для запуска 6000 отдельных запросов.Вместо этого вы можете попытаться разделить запросы на части.Например, отправьте 500 идентификаторов одновременно.Вам придется поэкспериментировать с размером чанка, чтобы определить наибольшее количество идентификаторов, которые вы можете отправить за один раз, при этом все еще с комфортом в рамках вашего бюджета памяти.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...