Я пытаюсь создать кэш sqlite3 в памяти для хранения oauth-токенов, но сталкиваюсь с проблемами, связанными с многопоточностью.После запуска нескольких тестов я заметил, что поведение существенно отличается от баз данных не в памяти и многопоточности.
Примечательно, что потоки считывателя немедленно завершаются с ошибкой «таблица заблокирована», если поток записывающего записал без фиксации.Это верно для нескольких потоков даже при isolation_level=None
.
. Это не просто то, что считыватели блокируются до завершения транзакции, а скорее они сразу же завершаются неудачей, независимо от timeout
или PRAGMA busy_timeout = 10000
.
Единственный способ заставить его работать - это установить isolation_level=None
и сделать PRAGMA read_uncommitted=TRUE
.Однако я бы предпочел этого не делать.
Можно ли позволить потокам считывателя просто подождать блокировку вместо немедленного сбоя?
import sqlite3
import threading
def get_conn(name, is_memory=False, timeout=5, isolation_level='IMMEDIATE', pragmas=None):
uri = 'file:%s' % name
if is_memory:
uri = uri + '?mode=memory&cache=shared'
conn = sqlite3.connect(uri, uri=True, timeout=timeout, isolation_level=isolation_level)
if pragmas is None:
pragmas = []
if not isinstance(pragmas, list):
pragmas = [pragmas]
for pragma in pragmas:
conn.execute(pragma)
return conn
def work1(name, is_memory=False, timeout=5, isolation_level='IMMEDIATE', pragmas=None, loops=1):
conn = get_conn(name, is_memory=is_memory, timeout=timeout, isolation_level=isolation_level, pragmas=pragmas)
for i in range(loops):
conn.execute('INSERT INTO foo VALUES (1)')
def work2(name, is_memory=False, timeout=5, isolation_level='IMMEDIATE', pragmas=None, loops=1):
conn = get_conn(name, is_memory=is_memory, timeout=timeout, isolation_level=isolation_level, pragmas=pragmas)
for i in range(loops):
len(conn.execute('SELECT * FROM foo').fetchall())
def main(name, is_memory=False, timeout=5, isolation_level='IMMEDIATE', pragmas=None, loops=1, num_threads=16):
conn = get_conn(name, is_memory=is_memory, timeout=timeout, isolation_level=isolation_level, pragmas=pragmas)
try:
conn.execute('CREATE TABLE foo(a int)')
except sqlite3.OperationalError:
conn.execute('DROP TABLE foo')
conn.execute('CREATE TABLE foo(a int)')
threads = []
for i in range(num_threads):
threads.append(threading.Thread(target=work1, args=(name, is_memory, timeout, isolation_level, pragmas, loops)))
threads.append(threading.Thread(target=work2, args=(name, is_memory, timeout, isolation_level, pragmas, loops)))
for thread in threads:
thread.start()
for thread in threads:
thread.join()
# In-Memory Tests
# All of these fail immediately with table is locked. There is no delay; timeout/busy_timeout has no effect.
main('a', is_memory=True, timeout=5, isolation_level='IMMEDIATE', pragmas=None)
main('b', is_memory=True, timeout=5, isolation_level='DEFERRED', pragmas=None)
main('c', is_memory=True, timeout=5, isolation_level='EXCLUSIVE', pragmas=None)
main('d', is_memory=True, timeout=5, isolation_level=None, pragmas=None)
main('e', is_memory=True, timeout=5, isolation_level='IMMEDIATE', pragmas=['PRAGMA busy_timeout = 10000'])
main('f', is_memory=True, timeout=5, isolation_level='DEFERRED', pragmas=['PRAGMA busy_timeout = 10000'])
main('g', is_memory=True, timeout=5, isolation_level='EXCLUSIVE', pragmas=['PRAGMA busy_timeout = 10000'])
main('h', is_memory=True, timeout=5, isolation_level=None, pragmas=['PRAGMA busy_timeout = 10000'])
main('i', is_memory=True, timeout=5, isolation_level='IMMEDIATE', pragmas=['PRAGMA read_uncommitted=TRUE'])
main('j', is_memory=True, timeout=5, isolation_level='DEFERRED', pragmas=['PRAGMA read_uncommitted=TRUE'])
main('k', is_memory=True, timeout=5, isolation_level='EXCLUSIVE', pragmas=['PRAGMA read_uncommitted=TRUE'])
# This is the only successful operation, when isolation_level = None and PRAGMA read_uncommitted=TRUE
main('l', is_memory=True, timeout=5, isolation_level=None, pragmas=['PRAGMA read_uncommitted=TRUE'])
# These start to take a really long time
main('m', is_memory=True, timeout=5, isolation_level=None, pragmas=['PRAGMA read_uncommitted=TRUE'], loops=100)
main('n', is_memory=True, timeout=5, isolation_level=None, pragmas=['PRAGMA read_uncommitted=TRUE'], loops=100, num_threads=128)
# None of the on disk DB's ever fail:
main('o', is_memory=False, timeout=5, isolation_level='IMMEDIATE', pragmas=None)
main('p', is_memory=False, timeout=5, isolation_level='DEFERRED', pragmas=None)
main('q', is_memory=False, timeout=5, isolation_level='EXCLUSIVE', pragmas=None)
main('r', is_memory=False, timeout=5, isolation_level=None, pragmas=None)
main('s', is_memory=False, timeout=5, isolation_level='IMMEDIATE', pragmas=['PRAGMA busy_timeout = 10000'])
main('t', is_memory=False, timeout=5, isolation_level='DEFERRED', pragmas=['PRAGMA busy_timeout = 10000'])
main('u', is_memory=False, timeout=5, isolation_level='EXCLUSIVE', pragmas=['PRAGMA busy_timeout = 10000'])
main('v', is_memory=False, timeout=5, isolation_level=None, pragmas=['PRAGMA busy_timeout = 10000'])
main('w', is_memory=False, timeout=5, isolation_level='IMMEDIATE', pragmas=['PRAGMA read_uncommitted=TRUE'])
main('x', is_memory=False, timeout=5, isolation_level='DEFERRED', pragmas=['PRAGMA read_uncommitted=TRUE'])
main('y', is_memory=False, timeout=5, isolation_level='EXCLUSIVE', pragmas=['PRAGMA read_uncommitted=TRUE'])
main('z', is_memory=False, timeout=5, isolation_level=None, pragmas=['PRAGMA read_uncommitted=TRUE'])
# These actually fail with database is locked
main('aa', is_memory=False, timeout=5, isolation_level=None, pragmas=['PRAGMA read_uncommitted=TRUE'], loops=100)
main('ab', is_memory=False, timeout=5, isolation_level=None, pragmas=['PRAGMA read_uncommitted=TRUE'], loops=100, num_threads=128)