Как сделать upsert с SqlAlchemy? - PullRequest
       21

Как сделать upsert с SqlAlchemy?

47 голосов
/ 23 августа 2011

У меня есть запись о том, что я хочу существовать в базе данных, если ее там нет, и если она уже есть (первичный ключ существует), я хочу, чтобы поля были обновлены до текущего состояния.Это часто называют upsert .

Следующий фрагмент неполного кода демонстрирует, что будет работать, но выглядит чрезмерно неуклюжим (особенно если столбцов было намного больше).Что лучше / лучше?

Base = declarative_base()
class Template(Base):
    __tablename__ = 'templates'
    id = Column(Integer, primary_key = True)
    name = Column(String(80), unique = True, index = True)
    template = Column(String(80), unique = True)
    description = Column(String(200))
    def __init__(self, Name, Template, Desc):
        self.name = Name
        self.template = Template
        self.description = Desc

def UpsertDefaultTemplate():
    sess = Session()
    desired_default = Template("default", "AABBCC", "This is the default template")
    try:
        q = sess.query(Template).filter_by(name = desiredDefault.name)
        existing_default = q.one()
    except sqlalchemy.orm.exc.NoResultFound:
        #default does not exist yet, so add it...
        sess.add(desired_default)
    else:
        #default already exists.  Make sure the values are what we want...
        assert isinstance(existing_default, Template)
        existing_default.name = desired_default.name
        existing_default.template = desired_default.template
        existing_default.description = desired_default.description
    sess.flush()

Есть ли лучший или менее подробный способ сделать это?Что-то вроде этого было бы замечательно:

sess.upsert_this(desired_default, unique_key = "name")

, хотя kwarg unique_key явно не нужен (ORM должен быть в состоянии легко это выяснить) Я добавил это только потому, что SQLAlchemy имеет тенденцию работать только с первичнымключ.Например: я смотрел, будет ли Session.merge применимым, но это работает только с первичным ключом, который в данном случае является автоинкрементным идентификатором, который не очень полезен для этой цели.

Пример использования этого варианта - просто при запуске серверного приложения, которое могло обновить ожидаемые данные по умолчанию.то есть: нет проблем параллелизма для этого upsert.

Ответы [ 7 ]

43 голосов
/ 23 августа 2011

SQLAlchemy имеет поведение «сохранить или обновить», которое в последних версиях было встроено в session.add, но ранее это был отдельный вызов session.saveorupdate. Это не «упущение», но может быть достаточно для ваших нужд.

Хорошо, что вы спрашиваете о классе с несколькими уникальными ключами; Я считаю, что именно поэтому нет единственно правильного способа сделать это. Первичный ключ также является уникальным ключом. Если бы не было уникальных ограничений, только первичный ключ, это было бы достаточно простой проблемой: если ничего с данным ID не существует, или если ID - None, создайте новую запись; иначе обновите все остальные поля в существующей записи с этим первичным ключом.

Однако при наличии дополнительных уникальных ограничений возникают логические проблемы с этим простым подходом. Если вы хотите «сохранить» объект, и первичный ключ вашего объекта соответствует существующей записи, а другой уникальный столбец соответствует другой записи , то что вы будете делать? Аналогично, если первичный ключ не соответствует ни одной существующей записи, но другой уникальный столбец соответствует ли существующей записи, тогда что? Может быть правильный ответ для вашей конкретной ситуации, но в целом я бы сказал, что единого правильного ответа не существует.

Это может быть причиной отсутствия встроенной операции "upsert". Приложение должно определить, что это означает в каждом конкретном случае.

18 голосов
/ 06 июня 2017

SQLAlchemy поддерживает ON CONFLICT теперь двумя способами on_conflict_do_update() и on_conflict_do_nothing():

Копирование из документации:

from sqlalchemy.dialects.postgresql import insert

stmt = insert(my_table).values(user_email='a@b.com', data='inserted data')
stmt = stmt.on_conflict_do_update(
    index_elements=[my_table.c.user_email],
    index_where=my_table.c.user_email.like('%@gmail.com'),
    set_=dict(data=stmt.excluded.data)
    )
conn.execute(stmt)

http://docs.sqlalchemy.org/en/latest/dialects/postgresql.html?highlight=conflict#insert-on-conflict-upsert

8 голосов
/ 19 октября 2017

Я использую подход «посмотри, прежде чем прыгнуть»:

# first get the object from the database if it exists
# we're guaranteed to only get one or zero results
# because we're filtering by primary key
switch_command = session.query(Switch_Command).\
    filter(Switch_Command.switch_id == switch.id).\
    filter(Switch_Command.command_id == command.id).first()

# If we didn't get anything, make one
if not switch_command:
    switch_command = Switch_Command(switch_id=switch.id, command_id=command.id)

# update the stuff we care about
switch_command.output = 'Hooray!'
switch_command.lastseen = datetime.datetime.utcnow()

session.add(switch_command)
# This will generate either an INSERT or UPDATE
# depending on whether we have a new object or not
session.commit()

Преимущество в том, что он нейтрален по отношению к БД, и я думаю, что его легко читать.Недостатком является то, что есть потенциальное состояние гонки в сценарии, подобном следующему:

  • мы запрашиваем базу данных для switch_command и ненайти один
  • мы создаем switch_command
  • другой процесс или поток создает switch_command с тем же первичным ключом, что и наш
  • мы пытаемся зафиксировать наш switch_command
5 голосов
/ 28 июля 2018

В настоящее время SQLAlchemy предоставляет две полезные функции on_conflict_do_nothing и on_conflict_do_update.Эти функции полезны, но требуют перехода с интерфейса ORM на интерфейс более низкого уровня - SQLAlchemy Core .

Хотя эти две функции затрудняют использование синтаксиса SQLAlchemy не так сложно, эти функциидалеки от предоставления полного готового решения для апсертирования.

Мой распространенный вариант использования - сохранить большой кусок строк в одном выполнении SQL-запроса / сеанса.Я обычно сталкиваюсь с двумя проблемами с апсертингом:

Например, функции ORM более высокого уровня, к которым мы привыкли, отсутствуют.Вы не можете использовать объекты ORM, но вместо этого должны предоставить ForeignKey s во время вставки.

Я использую эту следующую функцию, которую я написал для решения обеих этих проблем:

def upsert(session, model, rows):
    table = model.__table__
    stmt = postgresql.insert(table)
    primary_keys = [key.name for key in inspect(table).primary_key]
    update_dict = {c.name: c for c in stmt.excluded if not c.primary_key}

    if not update_dict:
        raise ValueError("insert_or_update resulted in an empty update_dict")

    stmt = stmt.on_conflict_do_update(index_elements=primary_keys,
                                      set_=update_dict)

    seen = set()
    foreign_keys = {col.name: list(col.foreign_keys)[0].column for col in table.columns if col.foreign_keys}
    unique_constraints = [c for c in table.constraints if isinstance(c, UniqueConstraint)]
    def handle_foreignkeys_constraints(row):
        for c_name, c_value in foreign_keys.items():
            foreign_obj = row.pop(c_value.table.name, None)
            row[c_name] = getattr(foreign_obj, c_value.name) if foreign_obj else None

        for const in unique_constraints:
            unique = tuple([const,] + [row[col.name] for col in const.columns])
            if unique in seen:
                return None
            seen.add(unique)

        return row

    rows = list(filter(None, (handle_foreignkeys_constraints(row) for row in rows)))
    session.execute(stmt, rows)
1 голос
/ 26 марта 2019

Для меня ниже работает отлично с базой данных красного смещения, а также для комбинированного ограничения первичного ключа.

SOURCE : this

JustДля создания движка SQLAlchemy в функции def start_engine ()

from sqlalchemy import Column, Integer, Date ,Metadata
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.dialects import postgresql

Base = declarative_base()

def start_engine():
    engine = create_engine(os.getenv('SQLALCHEMY_URI', 
    'postgresql://localhost:5432/upsert'))
     connect = engine.connect()
    meta = MetaData(bind=engine)
    meta.reflect(bind=engine)
    return engine


class DigitalSpend(Base):
    __tablename__ = 'digital_spend'
    report_date = Column(Date, nullable=False)
    day = Column(Date, nullable=False, primary_key=True)
    impressions = Column(Integer)
    conversions = Column(Integer)

    def __repr__(self):
        return str([getattr(self, c.name, None) for c in self.__table__.c])


def compile_query(query):
    compiler = query.compile if not hasattr(query, 'statement') else 
  query.statement.compile
    return compiler(dialect=postgresql.dialect())


def upsert(session, model, rows, as_of_date_col='report_date', no_update_cols=[]):
    table = model.__table__

    stmt = insert(table).values(rows)

    update_cols = [c.name for c in table.c
                   if c not in list(table.primary_key.columns)
                   and c.name not in no_update_cols]

    on_conflict_stmt = stmt.on_conflict_do_update(
        index_elements=table.primary_key.columns,
        set_={k: getattr(stmt.excluded, k) for k in update_cols},
        index_where=(getattr(model, as_of_date_col) < getattr(stmt.excluded, as_of_date_col))
        )

    print(compile_query(on_conflict_stmt))
    session.execute(on_conflict_stmt)


session = start_engine()
upsert(session, DigitalSpend, initial_rows, no_update_cols=['conversions'])
требуется несколько модификаций
1 голос
/ 08 ноября 2018

Это работает для меня с sqlite3 и postgres.Хотя это может произойти сбой с объединенными ограничениями первичного ключа и, скорее всего, произойдет сбой с дополнительными уникальными ограничениями.

    try:
        t = self._meta.tables[data['table']]
    except KeyError:
        self._log.error('table "%s" unknown', data['table'])
        return

    try:
        q = insert(t, values=data['values'])
        self._log.debug(q)
        self._db.execute(q)
    except IntegrityError:
        self._log.warning('integrity error')
        where_clause = [c.__eq__(data['values'][c.name]) for c in t.c if c.primary_key]
        update_dict = {c.name: data['values'][c.name] for c in t.c if not c.primary_key}
        q = update(t, values=update_dict).where(*where_clause)
        self._log.debug(q)
        self._db.execute(q)
    except Exception as e:
        self._log.error('%s: %s', t.name, e)
0 голосов
/ 05 апреля 2019

Это позволяет получить доступ к базовым моделям на основе имен строк

def get_class_by_tablename(tablename):
  """Return class reference mapped to table.
  https://stackoverflow.com/questions/11668355/sqlalchemy-get-model-from-table-name-this-may-imply-appending-some-function-to
  :param tablename: String with name of table.
  :return: Class reference or None.
  """
  for c in Base._decl_class_registry.values():
    if hasattr(c, '__tablename__') and c.__tablename__ == tablename:
      return c


sqla_tbl = get_class_by_tablename(table_name)

def handle_upsert(record_dict, table):
    """
    handles updates when there are primary key conflicts

    """
    try:
        self.active_session().add(table(**record_dict))
    except:
        # Here we'll assume the error is caused by an integrity error
        # We do this because the error classes are passed from the
        # underlying package (pyodbc / sqllite) SQLAlchemy doesn't mask
        # them with it's own code - this should be updated to have
        # explicit error handling for each new db engine

        # <update>add explicit error handling for each db engine</update> 
        active_session.rollback()
        # Query for conflic class, use update method to change values based on dict
        c_tbl_primary_keys = [i.name for i in table.__table__.primary_key] # List of primary key col names
        c_tbl_cols = dict(sqla_tbl.__table__.columns) # String:Col Object crosswalk

        c_query_dict = {k:record_dict[k] for k in c_tbl_primary_keys if k in record_dict} # sub-dict from data of primary key:values
        c_oo_query_dict = {c_tbl_cols[k]:v for (k,v) in c_query_dict.items()} # col-object:query value for primary key cols

        c_target_record = session.query(sqla_tbl).filter(*[k==v for (k,v) in oo_query_dict.items()]).first()

        # apply new data values to the existing record
        for k, v in record_dict.items()
            setattr(c_target_record, k, v)
...