Я следовал приведенному ниже коду для реализации параллельного запроса на выборку в базе данных postgres:
https://tech.geoblink.com/2017/07/06/parallelizing-queries-in-postgresql-with-python/
Моя основная проблема заключается в том, что у меня ~ 6k запросов, которые должны бытьвыполнено, и я пытаюсь оптимизировать выполнение этих запросов выбора.Первоначально это был один запрос, в котором where id in (...)
содержал все 6k идентификаторов предикатов, но у меня возникли проблемы с запросом, использующим более 4 ГБ ОЗУ на компьютере, на котором он выполнялся, поэтому я решил разбить его на 6 000 отдельных запросов, которые, когдасинхронно поддерживает постоянное использование памяти.Однако это занимает намного больше времени, что не является проблемой для моего варианта использования.Несмотря на это, я стараюсь максимально сократить время.
Вот как выглядит мой код:
class PostgresConnector(object):
def __init__(self, db_url):
self.db_url = db_url
self.engine = self.init_connection()
self.pool = self.init_pool()
def init_pool(self):
CPUS = multiprocessing.cpu_count()
return multiprocessing.Pool(CPUS)
def init_connection(self):
LOGGER.info('Creating Postgres engine')
return create_engine(self.db_url)
def run_parallel_queries(self, queries):
results = []
try:
for i in self.pool.imap_unordered(self.execute_parallel_query, queries):
results.append(i)
except Exception as exception:
LOGGER.error('Error whilst executing %s queries in parallel: %s', len(queries), exception)
raise
finally:
self.pool.close()
self.pool.join()
LOGGER.info('Parallel query ran producing %s sets of results of type: %s', len(results), type(results))
return list(chain.from_iterable(results))
def execute_parallel_query(self, query):
con = psycopg2.connect(self.db_url)
cur = con.cursor()
cur.execute(query)
records = cur.fetchall()
con.close()
return list(records)
Однако, когда это выполняется, я получаю следующую ошибку:
TypeError: can't pickle _thread.RLock objects
Я читал много похожих вопросов, касающихся использования многопроцессорных и маринованных объектов, но я не могу на всю жизнь понять, что я делаю неправильно.
Пул обычно одинза процесс (который я считаю наилучшей практикой), но разделяемый на экземпляр класса соединителя, чтобы не создавать пул для каждого использования метода parallel_query.
Главный ответ на аналогичный вопрос:
Доступ к пулу соединений MySQL из многопроцессорной обработки Python
Показывает почти идентичную мою реализацию, за исключением использования MySql вместо Postgres.
Я что-то делаюне так?
Спасибо!
РЕДАКТИРОВАТЬ:
Я нашел этот ответ:
Python Postgres psycopg2 ThreadedConnectionPool исчерпан
, который невероятно детален и выглядит так, как будто я неправильно понял, что дает multiprocessing.Pool
против пула соединений, такого как ThreadedConnectionPool
.Однако в первой ссылке нет упоминания о необходимости каких-либо пулов соединений и т. Д. Это решение кажется хорошим, но, кажется, много кода для того, что я считаю довольно простой проблемой?
РЕДАКТИРОВАТЬ 2:
Таким образом, приведенная выше ссылка решает другую проблему, с которой я, вероятно, столкнулся бы в любом случае, поэтому я рад, что нашел это, но это не решает первоначальную проблему невозможности использовать imap_unordered
вплоть до ошибки травления.Очень расстраивает.
Наконец, я думаю, что, вероятно, стоит отметить, что это выполняется в Heroku, на рабочем динамо, с использованием Redis rq для планирования, фоновых задач и т. Д. И размещенного экземпляра Postgres в качестве базы данных.