Предисловие
Я хочу обрабатывать задачи, перечисленные в таблице базы данных, параллельно. Не требуется рабочий код.
Настройка
- 1 Сервер базы данных PostgreSQL D
- 1 сервер обработки P
- 1 Пользовательский терминал T
с использованием Python 3.6, psycopg2.7.6, PostgreSQL 11
D содержит таблицыс данными для обработки и таблицей tasks
. Пользователь с T ssh
в P , где можно выполнить следующую команду:
python -m core.utils.task
Этот сценарий task.py
по сути является while
цикл, который получает задачу t
из таблицы tasks
в D со статусом «новый», пока не останется новых задач. Задача t
- это, в основном, набор аргументов для другой функции с именем do_something(t)
. do_something(t)
сам сделает много подключений к D , чтобы получить данные, которые должны быть обработаны, и установить задачу в состояние «выполнено» после ее завершения - цикл while
запускается заново и получает новую задачу.
Чтобы запустить python -m core.utils.task
несколько раз, я открываю несколько ssh
соединений. Не так хорошо, я знаю;threading
или multiprocessing
будет лучше. Но он только для проверки, могу ли я выполнить упомянутую команду дважды.
Существует сценарий, который управляет всеми взаимодействиями с базой данных, называемый pgsql.py
, который необходим для получения задачи, а затем do_something(t)
. Я адаптировал шаблон синглтона из этого поста SE .
Псевдокод (в основном)
task.py
import mymodule
import pgsql
def main():
while True:
r, c = pgsql.SQL.select_task() # rows and columns
task = dotdict(dict(zip(c, r[0])))
mymodule.do_something(task)
if __name__ == "__main__":
main()
mymodule.py
import pgsql
def do_something(t):
input = pgsql.SQL.get_images(t.table,t.schema,t.image_id,t.image_directory)
some_other_function(input)
pgsql.SQL.task_status(t.task_id,'done')
pgsql.py
import psycopg2 as pg
class Postgres(object):
"""Adapted from https://softwareengineering.stackexchange.com/a/358061/348371"""
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = object.__new__(cls)
db_config = {'dbname': 'dev01', 'host': 'XXXXXXXX',
'password': 'YYYYY', 'port': 5432, 'user': 'admin'}
try:
print('connecting to PostgreSQL database...')
connection = Postgres._instance.connection = pg.connect(**db_config)
connection.set_session(isolation_level='READ COMMITTED', autocommit=True)
except Exception as error:
print('Error: connection not established {}'.format(error))
Postgres._instance = None
else:
print('connection established')
return cls._instance
def __init__(self):
self.connection = self._instance.connection
def query(self, query):
try:
with self.connection.cursor() as cur:
cur.execute(query)
rows = cur.fetchall()
cols = [desc[0] for desc in cur.description]
except Exception as error:
print('error execting query "{}", error: {}'.format(query, error))
return None
else:
return rows, cols
def __del__(self):
self.connection.close()
db = Postgres()
class SQL():
def select_task():
s = """
UPDATE schema.tasks
SET status = 'ready'
WHERE task_id = ( SELECT task_id
FROM schema.tasks
WHERE tasks.status = 'new'
LIMIT 1)
RETURNING *
;
""".format(m=mode)
return Postgres.query(db, s)
def task_status(id,status):
s = """
UPDATE
schema.tasks
SET
status = '{s}'
WHERE
tasks.task_id = '{id}'
;
""".format(s=status,
id=id)
return Postgres.query(db, s)
Проблема
Это работает с одним ssh
соединением. Задачи извлекаются из базы данных и обрабатываются, после завершения задание устанавливается на «выполнено». Как только я открываю второе ssh
соединение во втором терминале для запуска python -m core.utils.task
(так сказать, параллельно), одинаковые строки таблицы задач обрабатываются в обоих - игнорируя, что они были обновлены.
Вопрос
Что вы предлагаете, чтобы это сработало? Есть миллионы задач, и мне нужно выполнять их параллельно. Перед внедрением threading
или multiprocessing
я хотел сначала протестировать его с несколькими ssh
соединениями, плохая идея? Я возился с настройками isolation levels
и autocommit
в psycopg2
set_session()
, но без удачи. Я проверил сеансы на сервере базы данных и вижу, что каждый процесс python -m core.utils.task
имеет свой собственный PID, подключаясь только один раз, точно так же, как этот шаблон синглтона должен работать. Любые идеи или указания, как с этим бороться, очень ценятся!