Sqlalchemy, похоже, не очень хорошо управляет небезопасными соединениями monetdb - PullRequest
0 голосов
/ 23 октября 2018

Что касается моего понимания sqlalchemy, если вы хотите выполнять запросы к базе данных из параллельных потоков в python / sqlalchemy, вы должны использовать парадигму scoped_session .

Однако я заметил, чтодля pymonetdb, который даже при использовании небольшого числа параллельных запросов часто зависает или не работает с различными ошибками, например, соединение уже закрыто, логические ошибки в ответах на запросы.Десять или более параллельных запросов почти всегда приводят к зависанию скрипта.

Я сравниваю поведение с postgresql / psycopg2, который не показывает никаких проблем.Я уже сообщал об этой проблеме sqlalchemy-monetdb , но мне также интересно, правильно ли я использую scoped_session's.Пожалуйста, посмотрите на мой тестовый скрипт.

import concurrent.futures
import sys

from sqlalchemy.orm import scoped_session, sessionmaker
from sqlalchemy import create_engine

"""
Usage: python test.py (m|p) (nr_of_queries)
m := monetdb/pymapi
p := postgresql/psycopg2


make sure you have MonetDB running (I was using v11.29.3 "Mar2018")
and a postgresql server. (I was using  PostgreSQL 9.5.14 on x86_64-pc-linux-gnu, compiled by gcc (Ubuntu 5.4.0-6ubuntu1~16.04.10) 5.4.0 20160609, 64-bit)

On both servers make sure the database 'devdb' is present and accesible.

make sure you have installed
- pymonetdb==1.0.6
- SQLAlchemy==1.0.11
- sqlalchemy-monetdb==0.9.3
- psycopg2==2.7.5
"""

def _pool_task(sql, Session):

    result = None
    session = Session()
    result = session.execute(sql)
    return result



if sys.argv[1] == "m":
    engine = create_engine('monetdb:///devdb')
elif sys.argv[1] == "p":
    engine = create_engine('postgresql://frank:frankspassword@localhost/devdb')
else:
    raise ValueError("Unknown database {}".format(sys.argv[1]))

# The scoped_session object 'Session' is a factory to generate thread_local sqlalchemy.orm.session.Session object
Session = scoped_session(sessionmaker(autocommit=False, autoflush=True, bind=engine))

nr_queries = int(sys.argv[2])

queries = ["select sum(value) + {} from generate_series(1, 100) as foo(value);".format(i) for i in range(0, nr_queries)]

results = []

with concurrent.futures.ThreadPoolExecutor(max_workers=len(queries)) as executor:
    for future in concurrent.futures.as_completed([executor.submit(_pool_task, query, Session) for query in queries]):
        results.append(future.result())

data = [c.fetchall() for c in results]

print(data)
...