Массовая вставка с SQLAlchemy ORM - PullRequest
95 голосов
/ 07 сентября 2010

Есть ли способ заставить SQLAlchemy выполнять массовую вставку вместо вставки каждого отдельного объекта.то есть

делает:

INSERT INTO `foo` (`bar`) VALUES (1), (2), (3)

вместо:

INSERT INTO `foo` (`bar`) VALUES (1)
INSERT INTO `foo` (`bar`) VALUES (2)
INSERT INTO `foo` (`bar`) VALUES (3)

Я только что преобразовал некоторый код для использования sqlalchemy, а не raw sql, и хотя сейчасгораздо приятнее работать с ним сейчас, кажется, медленнее (до 10 раз), мне интересно, в этом ли причина.

Может быть, я мог бы улучшить ситуацию, используя сеансы более эффективно.На данный момент у меня есть autoCommit=False и я делаю session.commit() после того, как я добавил кое-что.Хотя это, кажется, приводит к тому, что данные устаревают, если БД изменяется в другом месте, например, даже если я делаю новый запрос, я все равно получаю старые результаты?

Спасибо за вашу помощь!

Ответы [ 10 ]

124 голосов
/ 03 июля 2015

SQLAlchemy представила, что в версии 1.0.0:

Массовые операции - документы SQLAlchemy

С помощью этих операций вы теперь можете выполнять массовые вставки или обновления!

Например, вы можете сделать:

s = Session()
objects = [
    User(name="u1"),
    User(name="u2"),
    User(name="u3")
]
s.bulk_save_objects(objects)
s.commit()

Здесь будет сделана массовая вставка.

28 голосов
/ 08 сентября 2010

Насколько я знаю, нет способа заставить ORM выдавать массовые вставки.Я полагаю, что основная причина заключается в том, что SQLAlchemy необходимо отслеживать идентичность каждого объекта (т. Е. Новые первичные ключи), и массовые вставки мешают этому.Например, если предположить, что ваша таблица foo содержит столбец id и сопоставлена ​​с классом Foo:

x = Foo(bar=1)
print x.id
# None
session.add(x)
session.flush()
# BEGIN
# INSERT INTO foo (bar) VALUES(1)
# COMMIT
print x.id
# 1

Поскольку SQLAlchemy выбрал значение для x.id без выполнения другого запроса, мыМожно сделать вывод, что он получил значение непосредственно из оператора INSERT.Если вам не требуется последующий доступ к созданным объектам через те же экземпляры * , вы можете пропустить слой ORM для вставки:

Foo.__table__.insert().execute([{'bar': 1}, {'bar': 2}, {'bar': 3}])
# INSERT INTO foo (bar) VALUES ((1,), (2,), (3,))

SQLAlchemy не может соответствовать этим новым строкамс любыми существующими объектами, поэтому вам придется заново запрашивать их для любых последующих операций.

Что касается устаревших данных, полезно помнить, что в сеансе нет встроенного способа узнать, когдабаза данных изменена вне сеанса.Чтобы получить доступ к измененным извне данным через существующие экземпляры, они должны быть помечены как expired .Это происходит по умолчанию на session.commit(), но это можно сделать вручную, вызвав session.expire_all() или session.expire(instance).Пример (SQL опущен):

x = Foo(bar=1)
session.add(x)
session.commit()
print x.bar
# 1
foo.update().execute(bar=42)
print x.bar
# 1
session.expire(x)
print x.bar
# 42

session.commit() expires x, поэтому первый оператор print неявно открывает новую транзакцию и повторно запрашивает атрибуты x.Если вы закомментируете первый оператор печати, вы заметите, что второй теперь выбирает правильное значение, потому что новый запрос не генерируется до окончания обновления.

Это имеет смысл с точки зренияпредставление о транзакционной изоляции - вы должны выбирать только внешние модификации между транзакциями.Если это вызывает у вас проблемы, я бы предложил уточнить или переосмыслить границы транзакций вашего приложения вместо того, чтобы сразу же достигать session.expire_all().

20 голосов
/ 18 декабря 2015

Документы sqlalchemy имеют запись о производительности различных методов, которые можно использовать для массовых вставок:

ORM в основном не предназначены для высокопроизводительных массовых вставок -в этом вся причина того, что SQLAlchemy предлагает ядро ​​в дополнение к ORM в качестве компонента первого класса.

В случае использования быстрых массовых вставок система генерации и выполнения SQL, которую ORM строит поверх, являетсячасть ядра.Используя эту систему напрямую, мы можем создать INSERT, который конкурирует с непосредственным использованием API необработанной базы данных.

В качестве альтернативы, SQLAlchemy ORM предлагает набор методов Bulk Operations, которые обеспечивают привязки к подразделам единицы работы.процесс для генерации конструкций INSERT и UPDATE на уровне ядра с небольшой степенью автоматизации на основе ORM.

Пример ниже иллюстрирует основанные на времени тесты для нескольких различных способов вставки строк, от самых автоматизированных дов мере.В cPython 2.7 наблюдаемое время выполнения:

classics-MacBook-Pro:sqlalchemy classic$ python test.py
SQLAlchemy ORM: Total time for 100000 records 12.0471920967 secs
SQLAlchemy ORM pk given: Total time for 100000 records 7.06283402443 secs
SQLAlchemy ORM bulk_save_objects(): Total time for 100000 records 0.856323003769 secs
SQLAlchemy Core: Total time for 100000 records 0.485800027847 secs
sqlite3: Total time for 100000 records 0.487842082977 sec

Сценарий:

import time
import sqlite3

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String,  create_engine
from sqlalchemy.orm import scoped_session, sessionmaker

Base = declarative_base()
DBSession = scoped_session(sessionmaker())
engine = None


class Customer(Base):
    __tablename__ = "customer"
    id = Column(Integer, primary_key=True)
    name = Column(String(255))


def init_sqlalchemy(dbname='sqlite:///sqlalchemy.db'):
    global engine
    engine = create_engine(dbname, echo=False)
    DBSession.remove()
    DBSession.configure(bind=engine, autoflush=False, expire_on_commit=False)
    Base.metadata.drop_all(engine)
    Base.metadata.create_all(engine)


def test_sqlalchemy_orm(n=100000):
    init_sqlalchemy()
    t0 = time.time()
    for i in xrange(n):
        customer = Customer()
        customer.name = 'NAME ' + str(i)
        DBSession.add(customer)
        if i % 1000 == 0:
            DBSession.flush()
    DBSession.commit()
    print(
        "SQLAlchemy ORM: Total time for " + str(n) +
        " records " + str(time.time() - t0) + " secs")


def test_sqlalchemy_orm_pk_given(n=100000):
    init_sqlalchemy()
    t0 = time.time()
    for i in xrange(n):
        customer = Customer(id=i+1, name="NAME " + str(i))
        DBSession.add(customer)
        if i % 1000 == 0:
            DBSession.flush()
    DBSession.commit()
    print(
        "SQLAlchemy ORM pk given: Total time for " + str(n) +
        " records " + str(time.time() - t0) + " secs")


def test_sqlalchemy_orm_bulk_insert(n=100000):
    init_sqlalchemy()
    t0 = time.time()
    n1 = n
    while n1 > 0:
        n1 = n1 - 10000
        DBSession.bulk_insert_mappings(
            Customer,
            [
                dict(name="NAME " + str(i))
                for i in xrange(min(10000, n1))
            ]
        )
    DBSession.commit()
    print(
        "SQLAlchemy ORM bulk_save_objects(): Total time for " + str(n) +
        " records " + str(time.time() - t0) + " secs")


def test_sqlalchemy_core(n=100000):
    init_sqlalchemy()
    t0 = time.time()
    engine.execute(
        Customer.__table__.insert(),
        [{"name": 'NAME ' + str(i)} for i in xrange(n)]
    )
    print(
        "SQLAlchemy Core: Total time for " + str(n) +
        " records " + str(time.time() - t0) + " secs")


def init_sqlite3(dbname):
    conn = sqlite3.connect(dbname)
    c = conn.cursor()
    c.execute("DROP TABLE IF EXISTS customer")
    c.execute(
        "CREATE TABLE customer (id INTEGER NOT NULL, "
        "name VARCHAR(255), PRIMARY KEY(id))")
    conn.commit()
    return conn


def test_sqlite3(n=100000, dbname='sqlite3.db'):
    conn = init_sqlite3(dbname)
    c = conn.cursor()
    t0 = time.time()
    for i in xrange(n):
        row = ('NAME ' + str(i),)
        c.execute("INSERT INTO customer (name) VALUES (?)", row)
    conn.commit()
    print(
        "sqlite3: Total time for " + str(n) +
        " records " + str(time.time() - t0) + " sec")

if __name__ == '__main__':
    test_sqlalchemy_orm(100000)
    test_sqlalchemy_orm_pk_given(100000)
    test_sqlalchemy_orm_bulk_insert(100000)
    test_sqlalchemy_core(100000)
    test_sqlite3(100000)
13 голосов
/ 26 апреля 2017

Я обычно делаю это, используя add_all.

from app import session
from models import User

objects = [User(name="u1"), User(name="u2"), User(name="u3")]
session.add_all(objects)
session.commit()
10 голосов
/ 01 декабря 2014

Прямая поддержка была добавлена ​​в SQLAlchemy, начиная с версии 0.8

Согласно документам , connection.execute(table.insert().values(data)) должно помочь.(Обратите внимание, что это , а не , то же самое, что и connection.execute(table.insert(), data), что приводит к множеству отдельных вставок строк посредством вызова executemany).На любом другом соединении, кроме локальной, разница в производительности может быть огромной.

7 голосов
/ 30 мая 2016

Ответ Пьера правильный, но одна проблема заключается в том, что bulk_save_objects по умолчанию не возвращает первичные ключи объектов, если это вас беспокоит. Установите return_defaults на True, чтобы получить это поведение.

Документация здесь .

foos = [Foo(bar='a',), Foo(bar='b'), Foo(bar='c')]
session.bulk_save_objects(foos, return_defaults=True)
for foo in foos:
    assert foo.id is not None
session.commit()
7 голосов
/ 19 февраля 2016

SQLAlchemy представила, что в версии 1.0.0:

Массовые операции - документы SQLAlchemy

С помощью этих операций вы теперь можете выполнять массовые вставки или обновления!

Например (если вы хотите наименьшие издержки для простых INSERT таблиц), вы можете использовать Session.bulk_insert_mappings():

loadme = [
        (1, 'a')
    ,   (2, 'b')
    ,   (3, 'c')
    ]

dicts = []
for i in range(len(loadme)):
    dicts.append(dict(bar=loadme[i][0], fly=loadme[i][1]))

s = Session()
s.bulk_insert_mappings(Foo, dicts)
s.commit()

Или, если хотите, пропустить loadme кортежи и записи словарей непосредственно в dicts (но мне проще не использовать всю многословность данных и загрузить список словарей в цикле).

5 голосов
/ 28 марта 2015

Это способ:

values = [1, 2, 3]
Foo.__table__.insert().execute([{'bar': x} for x in values])

Это вставит так:

INSERT INTO `foo` (`bar`) VALUES (1), (2), (3)

Ссылка: SQLAlchemy FAQ содержит тесты для различных методов фиксации.

4 голосов
/ 13 июня 2018

Все дороги ведут в Рим , но некоторые из них пересекают горы, требуются паромы, но если вы хотите быстро туда добраться, просто езжайте по автостраде.


В этом случае автомагистраль должна использовать функцию execute_batch () из psycopg2 . Документация говорит это лучше всего:

Текущая реализация executemany() (с использованием чрезвычайно благотворительного занижения) не особенно эффективна. Эти функции могут использоваться для ускорения повторного выполнения оператора с набором параметров. Благодаря уменьшению количества обращений к серверу производительность может быть на несколько порядков выше, чем при использовании executemany().

В моем собственном тесте execute_batch() равен примерно вдвое быстрее , чем executemany() и дает возможность настроить размер страницы для дальнейшей настройки (если вы хотите сжать последние 2-3% производительность вне водителя).

Эту же функцию можно легко включить, если вы используете SQLAlchemy, установив use_batch_mode=True в качестве параметра при создании экземпляра движка с create_engine()

1 голос
/ 25 апреля 2018

Лучший ответ, который я нашел до сих пор, был в документации sqlalchemy:

http://docs.sqlalchemy.org/en/latest/faq/performance.html#i-m-inserting-400-000-rows-with-the-orm-and-it-s-really-slow

Существует полный пример теста возможных решений.

Как показанов документации:

bulk_save_objects - не лучшее решение, но его производительность верна.

Вторая лучшая реализация с точки зрения читабельности, я думаю, была с ядром SQLAlchemy:

def test_sqlalchemy_core(n=100000):
    init_sqlalchemy()
    t0 = time.time()
    engine.execute(
        Customer.__table__.insert(),
            [{"name": 'NAME ' + str(i)} for i in xrange(n)]
    )

Контекст этой функции приведен в статье документации.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...