Я пытаюсь создать приложение, которое будет «проверять» ячейку, представляющую собой квадрат, покрывающий часть земли в географической базе данных c, и выполнять анализ объектов в этой ячейке. Поскольку у меня есть много ячеек для обработки, я использую многопроцессорный подход.
Я работал в моем объекте примерно так:
class DistributedGeographicConstraintProcessor:
...
def _process_cell(self, conn_string):
conn = pg2.connect(conn_string)
try:
cur = conn.cursor()
cell_id = self._check_out_cell(cur)
conn.commit()
print(f"processing cell_id {cell_id}...")
for constraint in self.constraints:
# print(f"processing {constraint.name()}...")
query = constraint.prepare_distributed_query(self.job, self.grid)
cur.execute(query, {
"buffer": constraint.buffer(),
"cell_id": cell_id,
"name": constraint.name(),
"simplify_tolerance": constraint.simplify_tolerance()
})
# TODO: do a final race condition check to further suppress duplicates
self._check_in_cell(cur, cell_id)
conn.commit()
finally:
del cur
conn.close()
return None
def run(self):
while True:
if not self._job_finished():
params = [self.conn_string] * self.num_cores
processes = []
for param in params:
process = mp.Process(target=self._process_cell, args=(param,))
processes.append(process)
sleep(0.1) # Prevent multiple processes from checkout out the same grid square
process.start()
for process in processes:
process.join()
else:
self._finalize_job()
break
Но проблема в том, что он будет только запустите четыре процесса и дождитесь, пока все они завершат sh, прежде чем запускать четыре новых процесса.
Я хочу сделать так, чтобы, когда один процесс завершил свою работу, он сразу же начал работать над следующей ячейкой, даже если ее ко-процессы еще не завершены.
Я не уверен, как это реализовать, и я пытался использовать пул, подобный этому:
def run(self):
pool = mp.Pool(self.num_cores)
unprocessed_cells = self._unprocessed_cells()
for i in pool.imap(self._process_cell, unprocessed_cells):
print(i)
Но это просто говорит мне, что соединение невозможно мариновать:
TypeError: can't pickle psycopg2.extensions.connection objects
Но я не понимаю почему, потому что это та же самая функция, которую я использую в функции imap
, как и в цели Process
.
Я уже посмотрел эти темы, вот почему они не отвечают на мой вопрос:
- Ошибка при подключении к PostgreSQL не может засечь объекты psycopg2.extensions.connection - Ответ здесь указывает только на то, что несколько процессов не могут использовать одно и то же соединение. Я знаю об этом и инициализирую процесс внутри функции, которая выполняется в дочернем процессе. Кроме того, как я уже упоминал, он работает, когда я сопоставляю функцию отдельным
Process
экземплярам с одной и той же функцией с теми же входами. - Результат многопроцессорной обработки запроса psycopg2. «Не могу засечь объекты psycopg2.extensions.connection» - На этот вопрос нет ни ответа, ни комментариев, и код в любом случае не поврежден - автор ссылается на функцию, которая не указана в вопросе, и в любом случае очевидно, что они явно пытаются разделить один и тот же курсор между процессами.