В моем случае я хочу обрабатывать много документов (100 мил), хранящихся в таблице psql, параллельно. Чтобы каждый работник не мог обрабатывать один и тот же документ одновременно, я создал таблицу очередей. Алгоритм для каждого работника: ...
- Выбрать документ, идентификатор которого пока отсутствует в таблице очередей
- Вставить запись в таблицу очередей со ссылкой на эту спецификацию c документ (идентификатор) и статус = in_progress (0)
- Обработка документа
- Обновление записи в таблице очередей (статус = выполнено (1))
- Go назад 1. 1. 1012 *
Для этого я создал изолированный запрос транзакции:
BEGIN;
WITH TMP AS (SELECT id FROM documents WHERE id NOT IN (SELECT reference_id FROM queue) LIMIT 1)
INSERT INTO queue (reference_id, status) SELECT (SELECT id FROM TMP), 0 RETURNING reference_id;
COMMIT;
Используя psql cli, этот оператор возвращает:
BEGIN
reference_id
------------
42
(1 row)
INSERT 0 1
COMMIT
Работает как шарм.
Но когда я использую его в скрипте, написанном в python3 .7 с использованием sqlalchemy, я не получу строку из этой транзакции:
from sqlalchemy import create_engine
engine = create_engine('postgres://db:db@localhost:5432/db')
transaction = '''
BEGIN;
WITH TMP AS (SELECT id FROM documents WHERE id NOT IN (SELECT reference_id FROM queue) LIMIT 1)
INSERT INTO queue (reference_id, status) SELECT (SELECT id FROM TMP), 0 RETURNING reference_id;
COMMIT;
'''
with engine.connect() as con:
row = con.execute(transaction)
print(row.first())
Последняя строка поднимается .. .
sqlalchemy.exc.ResourceClosedError: This result object does not return rows. It has been closed automatically.
Могу ли я получить доступ к этому reference_id
из изолированной транзакции? Или еще лучше переменная TMP
?