Несколько соединений с БД, использующих UPDATE ... RETURNING, похоже, не обновляют строки в таблице задач - PullRequest
1 голос
/ 11 октября 2019

Предисловие

Я хочу обрабатывать задачи, перечисленные в таблице базы данных, параллельно. Не требуется рабочий код.

Настройка

  • 1 Сервер базы данных PostgreSQL D
  • 1 сервер обработки P
  • 1 Пользовательский терминал T

с использованием Python 3.6, psycopg2.7.6, PostgreSQL 11

D содержит таблицыс данными для обработки и таблицей tasks. Пользователь с T ssh в P , где можно выполнить следующую команду:

python -m core.utils.task

Этот сценарий task.py по сути является while цикл, который получает задачу t из таблицы tasks в D со статусом «новый», пока не останется новых задач. Задача t - это, в основном, набор аргументов для другой функции с именем do_something(t). do_something(t) сам сделает много подключений к D , чтобы получить данные, которые должны быть обработаны, и установить задачу в состояние «выполнено» после ее завершения - цикл while запускается заново и получает новую задачу.

Чтобы запустить python -m core.utils.task несколько раз, я открываю несколько ssh соединений. Не так хорошо, я знаю;threading или multiprocessing будет лучше. Но он только для проверки, могу ли я выполнить упомянутую команду дважды.

Существует сценарий, который управляет всеми взаимодействиями с базой данных, называемый pgsql.py, который необходим для получения задачи, а затем do_something(t). Я адаптировал шаблон синглтона из этого поста SE .

Псевдокод (в основном)

task.py

import mymodule
import pgsql

def main():
    while True:
        r, c = pgsql.SQL.select_task()  # rows and columns
        task = dotdict(dict(zip(c, r[0])))
        mymodule.do_something(task)

if __name__ == "__main__":
    main()

mymodule.py

import pgsql

def do_something(t):
    input = pgsql.SQL.get_images(t.table,t.schema,t.image_id,t.image_directory)
    some_other_function(input)
    pgsql.SQL.task_status(t.task_id,'done')

pgsql.py

import psycopg2 as pg

class Postgres(object):
    """Adapted from https://softwareengineering.stackexchange.com/a/358061/348371"""
    _instance = None

    def __new__(cls):
        if cls._instance is None:
            cls._instance = object.__new__(cls)
            db_config = {'dbname': 'dev01', 'host': 'XXXXXXXX',
                         'password': 'YYYYY', 'port': 5432, 'user': 'admin'}
            try:
                print('connecting to PostgreSQL database...')
                connection = Postgres._instance.connection = pg.connect(**db_config)
                connection.set_session(isolation_level='READ COMMITTED', autocommit=True)
            except Exception as error:
                print('Error: connection not established {}'.format(error))
                Postgres._instance = None

            else:
                print('connection established')

        return cls._instance

    def __init__(self):
        self.connection = self._instance.connection

    def query(self, query):
        try:
            with self.connection.cursor() as cur:
                cur.execute(query)
                rows = cur.fetchall()
                cols = [desc[0] for desc in cur.description]
        except Exception as error:
            print('error execting query "{}", error: {}'.format(query, error))
            return None
        else:
            return rows, cols

    def __del__(self):
        self.connection.close()

db = Postgres()
class SQL():
    def select_task():
        s = """
            UPDATE schema.tasks
               SET status = 'ready'
             WHERE task_id = (  SELECT task_id
                                  FROM schema.tasks
                                 WHERE tasks.status = 'new'
                                 LIMIT 1)
            RETURNING *
            ;
            """.format(m=mode)
        return Postgres.query(db, s)


    def task_status(id,status):
        s = """
            UPDATE
                schema.tasks
            SET
                status = '{s}'
            WHERE
                tasks.task_id = '{id}'
            ;
            """.format(s=status,
                       id=id)
        return Postgres.query(db, s)

Проблема

Это работает с одним ssh соединением. Задачи извлекаются из базы данных и обрабатываются, после завершения задание устанавливается на «выполнено». Как только я открываю второе ssh соединение во втором терминале для запуска python -m core.utils.task (так сказать, параллельно), одинаковые строки таблицы задач обрабатываются в обоих - игнорируя, что они были обновлены.

Вопрос

Что вы предлагаете, чтобы это сработало? Есть миллионы задач, и мне нужно выполнять их параллельно. Перед внедрением threading или multiprocessing я хотел сначала протестировать его с несколькими ssh соединениями, плохая идея? Я возился с настройками isolation levels и autocommit в psycopg2 set_session(), но без удачи. Я проверил сеансы на сервере базы данных и вижу, что каждый процесс python -m core.utils.task имеет свой собственный PID, подключаясь только один раз, точно так же, как этот шаблон синглтона должен работать. Любые идеи или указания, как с этим бороться, очень ценятся!

1 Ответ

2 голосов
/ 11 октября 2019

Основная проблема заключается в том, что выполнение одной задачи не является атомарной операцией. Поэтому в разных сеансах SSH одну и ту же задачу можно обрабатывать несколько раз.

В этой реализации вы можете попытаться использовать статус "INPROGRESS" для задачи, чтобы не получать задачи, которые уже обрабатываются (со статусом "INPROGRESS"). Но обязательно используйте автокоммит.

Но я бы реализовал это, используя потоки и пул соединений с базой данных. И будет извлекать задачи в пакетах, используя OFFSET и LIMIT. Функции do_something, select_task и task_status будут реализованы для пакета задач.

Кроме того, нет необходимости реализовывать класс Postgres в качестве одиночного.


Изменено (см. Комментарии ниже)

  • Вы можете добавить FOR UPDATE SKIP LOCKED к запросу SQL в текущей реализации (см. url ).
  • Если вы хотите работать с пакетами, то разделите данные по некоторому последовательному столбцу (ну, или просто сортируйте данные в таблице).
  • Моя реализация с использованием пакетов .
  • Это может быть реализовано с использованием ThreadPoolExecutor и PersistentConnectionPool.
...