SQLAlchemy странное поведение потоков - PullRequest
0 голосов
/ 29 мая 2018

Рассмотрим следующий пример кода с использованием Python 3.6.5 и SQLAlchemy 1.2.7

import threading
from concurrent.futures import ThreadPoolExecutor

from sqlalchemy import create_engine, Column, Integer, Boolean
from sqlalchemy.exc import OperationalError
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, scoped_session, Session

engine = create_engine("sqlite:///threading_sqlalchemy.db")
base = declarative_base(engine)
smaker = sessionmaker(engine)
scopedmaker: scoped_session = scoped_session(smaker)

dblock = threading.Lock()


class Key(base):
    __tablename__ = "Key"
    id = Column(Integer, primary_key=True)
    value = Column(Integer, nullable=False, unique=True, index=True)
    taken = Column(Boolean, nullable=False, default=False)

    def __repr__(self):
        return f"<Key id={self.id}, value={self.value}, taken={self.taken}>"


try:
    Key.__table__.drop()
    # this is also quite funny, if the table doesn't exist it throws:
    # sqlite3.OperationalError: no such table: Key
    # when there is literally a sqlalchemy.exc.NoSuchTableError
except OperationalError:
    pass
base.metadata.create_all()


def gen_keys(n):
    print(f"made in {threading.current_thread()}")
    with dblock:
        session: Session = scopedmaker()
        session.bulk_save_objects([Key(value=i * 100) for i in range(0, n)])
        session.commit()


def take_keys(n):
    print(f"used in {threading.current_thread()}")
    with dblock:
        session: Session = scopedmaker()
        keys = session.query(Key).filter(Key.taken == False).limit(n).all()
        for key in keys:
            key.taken = True
        print(keys)
        session.commit()


def take_keys_2(n):
    print(f"used in {threading.current_thread()}")
    with dblock:
        session: Session = scopedmaker()
        keys = session.query(Key).filter(Key.taken == False).limit(n).all()
        for key in keys:
            key.taken = True
        session.commit()
        print(keys)


gen_keys(100)

# take_keys works just as expected
with ThreadPoolExecutor() as executor:
    for _ in range(0, 5):
        executor.submit(take_keys, 10)

# take_keys_2 breaks, raises following error
# >>> sqlite3.ProgrammingError: SQLite objects created in a thread can only be used in that same thread.
# >>> The object was created in thread id 12340 and this is thread id 4312
# according to the console log, 12340 is one of the ThreadPoolExecutor threads, and 4312 is the main thread
with ThreadPoolExecutor() as executor:
    for _ in range(0, 5):
        executor.submit(take_keys_2, 10)

У меня просто есть действительно простой класс Key, который имеет значение и может быть помечен как taken,Подумайте о чем-то вроде бесплатной раздачи, в которой вы не хотели бы распространять один и тот же продукт среди разных потенциальных клиентов.Я использовал это для проверки существующих условий гонки и вынудил меня использовать блокировку доступа к базе данных, ничего страшного, я могу с этим смириться.

Что я действительно не понимаю, так это почему take_keys работает, но take_keys_2 прерывается, когда единственная разница между ними - это позиция оператора print(keys).Тем более, что в нефункциональном примере сообщение об ошибке похоже на то, что я использую созданные объекты в неправильном потоке (нет, я просто использую его после session.commit() в том же потоке, который его создал.

Если бы кто-нибудь мог пролить свет на то, почему это происходит, я был бы рад.

1 Ответ

0 голосов
/ 29 мая 2018

Теперь у меня нет всех деталей, но достаточно, чтобы разобраться в вашей ситуации. Поддержка многопоточности в SQLite не очень хороша.По этой причине поведение пула SQLAlchemy по умолчанию равно SingletonThreadPool при использовании базы данных в памяти или NullPool при использовании файла.Последнее означает, что пул вообще отсутствует, или, другими словами, соединение всегда открывается и закрывается в соответствии с запросом.

Позиция print() имеет значение, потому что приведенный выше вызов session.commit() истекает все загруженное состояние базы данныхобъекты в сессии.Таким образом, чтобы напечатать список ключей, который в итоге вызывает их __repr__, SQLAlchemy должен повторно получить состояние каждого объекта.Если вы добавите echo=True к своему вызову к create_engine(), это станет очевидным.

После всего этого ваш session в take_keys_2 удерживает соединение с открытой транзакцией.Вот где она становится немного мутной: при выходе из функции session выходит из области видимости, и это означает, что удерживаемое ею соединение в конечном итоге возвращается в пул.Но пул NullPool, поэтому он завершает и закрывает соединение и сбрасывает его.Завершение означает откат любой открытой транзакции, и вот что терпит неудачу:

Traceback (most recent call last):
  File "~/Work/sqlalchemy/lib/sqlalchemy/pool.py", line 705, in _finalize_fairy
    fairy._reset(pool)
  File "~/Work/sqlalchemy/lib/sqlalchemy/pool.py", line 876, in _reset
    pool._dialect.do_rollback(self)
  File "~/Work/sqlalchemy/lib/sqlalchemy/engine/default.py", line 457, in do_rollback
    dbapi_connection.rollback()
sqlite3.ProgrammingError: SQLite objects created in a thread can only be used in that same thread.The object was created in thread id 140683561543424 and this is thread id 140683635291968

Завершение выполняется в «фиктивном» потоке во время выключения интерпретатора вместо рабочего, так как соединение было оставлено.

Если, например, вы добавляете вызов к session.rollback() после print(keys):

def take_keys_2(n):
    ...
    with dblock:
        ...
        session.commit()
        print(keys)
        session.rollback()

, соединение возвращается в пул явно , и take_keys_2 работает какЧто ж.Другой вариант - использовать expire_on_commit=False, чтобы после фиксации не требовалось никаких дополнительных запросов для печати представления объектов Key:

def take_keys_2(n):
    with dblock:
        session: Session = scopedmaker(expire_on_commit=False)
        ...
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...