Что касается моего понимания sqlalchemy, если вы хотите выполнять запросы к базе данных из параллельных потоков в python / sqlalchemy, вы должны использовать парадигму scoped_session .
Однако я заметил, чтодля pymonetdb, который даже при использовании небольшого числа параллельных запросов часто зависает или не работает с различными ошибками, например, соединение уже закрыто, логические ошибки в ответах на запросы.Десять или более параллельных запросов почти всегда приводят к зависанию скрипта.
Я сравниваю поведение с postgresql / psycopg2, который не показывает никаких проблем.Я уже сообщал об этой проблеме sqlalchemy-monetdb , но мне также интересно, правильно ли я использую scoped_session's.Пожалуйста, посмотрите на мой тестовый скрипт.
import concurrent.futures
import sys
from sqlalchemy.orm import scoped_session, sessionmaker
from sqlalchemy import create_engine
"""
Usage: python test.py (m|p) (nr_of_queries)
m := monetdb/pymapi
p := postgresql/psycopg2
make sure you have MonetDB running (I was using v11.29.3 "Mar2018")
and a postgresql server. (I was using PostgreSQL 9.5.14 on x86_64-pc-linux-gnu, compiled by gcc (Ubuntu 5.4.0-6ubuntu1~16.04.10) 5.4.0 20160609, 64-bit)
On both servers make sure the database 'devdb' is present and accesible.
make sure you have installed
- pymonetdb==1.0.6
- SQLAlchemy==1.0.11
- sqlalchemy-monetdb==0.9.3
- psycopg2==2.7.5
"""
def _pool_task(sql, Session):
result = None
session = Session()
result = session.execute(sql)
return result
if sys.argv[1] == "m":
engine = create_engine('monetdb:///devdb')
elif sys.argv[1] == "p":
engine = create_engine('postgresql://frank:frankspassword@localhost/devdb')
else:
raise ValueError("Unknown database {}".format(sys.argv[1]))
# The scoped_session object 'Session' is a factory to generate thread_local sqlalchemy.orm.session.Session object
Session = scoped_session(sessionmaker(autocommit=False, autoflush=True, bind=engine))
nr_queries = int(sys.argv[2])
queries = ["select sum(value) + {} from generate_series(1, 100) as foo(value);".format(i) for i in range(0, nr_queries)]
results = []
with concurrent.futures.ThreadPoolExecutor(max_workers=len(queries)) as executor:
for future in concurrent.futures.as_completed([executor.submit(_pool_task, query, Session) for query in queries]):
results.append(future.result())
data = [c.fetchall() for c in results]
print(data)