Мотивации
У меня есть данные, поступающие из источника, с которым я взаимодействую с пандами DataFrame.У меня есть датамодель с интерфейсом SQLAlchemy ORM.Ради MCVE я нормализовал модель данных в две таблицы:
channel
с метаданными о записях (небольшой объем, ~ 1 тыс. Строк); record
с записями, указывающими наchannel
(более высокий объем, 90 тыс. Строк / день).
Цель channel
- избежать повторения.То, что я хочу, это установить питонную вставку данных в таблицу record
, используя SQLAlchemy с ограничением, о котором источник данных не знает о channelid
.
Источник данных
Вот примерданные из источника (единственные данные, к которым у меня есть доступ):
import pandas as pd
recs = [
{'serial': '1618741320', 'source': 1, 'channel': 4, 'timestamp': pd.Timestamp('2019-01-01 08:35:00'), 'value': 12},
{'serial': '1350397285', 'source': 2, 'channel': 3, 'timestamp': pd.Timestamp('2019-01-01 09:20:00'), 'value': 37},
{'serial': '814387724', 'source': 2, 'channel': 1, 'timestamp': pd.Timestamp('2019-01-01 12:30:00'), 'value': 581},
{'serial': '545914014', 'source': 3, 'channel': 0, 'timestamp': pd.Timestamp('2019-01-01 01:45:00'), 'value': 0},
{'serial': '814387724', 'source': 0, 'channel': 5, 'timestamp': pd.Timestamp('2019-01-01 14:20:00'), 'value': 699}
]
data = pd.DataFrame(recs)
Здесь образец мета, сохраненный в channel
, который был извлечен из установки.
recs = [
{'channelid': 28, 'serial': '545914014', 'source': 3, 'channel': 0},
{'channelid': 73, 'serial': '1350397285', 'source': 2, 'channel': 3},
{'channelid': 239, 'serial': '1618741320', 'source': 1, 'channel': 4},
{'channelid': 245, 'serial': '814387724', 'source': 0, 'channel': 5},
{'channelid': 259, 'serial': '814387724', 'source': 2, 'channel': 1}
]
meta= pd.DataFrame(recs)
MCVE
Сначала давайте начнем с MCVE!
Мы определяем модель данных:
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import create_engine
from sqlalchemy import Column, Integer, Float, String, DateTime
from sqlalchemy import UniqueConstraint, ForeignKey
from sqlalchemy.orm import relationship
Base = declarative_base()
Engine = create_engine("postgresql://postgres:postgres@localhost:5432/postgres")
class Channel(Base):
__tablename__ = 'channel'
__table_args__ = (UniqueConstraint('serial', 'source', 'channel'),)
id = Column(Integer, primary_key=True)
serial = Column(String, nullable=False)
source = Column(Integer, nullable=False)
channel = Column(Integer, nullable=False)
class Record(Base):
__tablename__ = 'record'
__table_args__ = (UniqueConstraint('channelid', 'timestamp'),)
id = Column(Integer, primary_key=True)
channelid = Column(Integer, ForeignKey('channel.id'), nullable=False)
timestamp = Column(DateTime, nullable=False)
value = Column(Float, nullable=False)
channel = relationship("channel")
Base.metadata.drop_all(Engine)
Base.metadata.create_all(Engine)
И мы подаем таблицу channel
, чтобы отразить мета, которая у нас уже есть:
with Engine.connect() as dbcon:
dbcon.execute(Channel.__table__.insert(), meta.to_dict(orient='records'))
Проблема для решения
Теперь мы хотели бы легко вставить data
в record
таблицу, но, к сожалению, нам не хватает channelid
из нашего источника данных (который не знает об этом).Очевидно, что этот вызов не удался:
with Engine.connect() as dbcon:
with dbcon.begin() as dbtrans:
dbcon.execute(Record.__table__.insert(), data.to_dict(orient='records'))
dbtrans.commit()
Из-за:
IntegrityError: (psycopg2.errors.NotNullViolation) null value in column "channelid" violates not-null constraint
DETAIL: Failing row contains (6, null, 2019-01-01 08:35:00, 12).
[SQL: 'INSERT INTO record (timestamp, value) VALUES (%(timestamp)s, %(value)s)'] [parameters: ({'timestamp': Timestamp('2019-01-01 08:35:00'), 'value': 12}, {'timestamp': Timestamp('2019-01-01 09:20:00'), 'value': 37}, {'timestamp': Timestamp('2019-01-01 12:30:00'), 'value': 581}, {'timestamp': Timestamp('2019-01-01 01:45:00'), 'value': 0}, {'timestamp': Timestamp('2019-01-01 14:20:00'), 'value': 699})]
Мы могли бы обработать его с помощью панд:
meta = pd.read_sql("SELECT id AS channelid, serial, source, channel FROM channel;", Engine.connect())
full = data.merge(meta, on=['serial', 'source', 'channel'])
И предыдущий вызов будет работать, потому чтосоединение с channelid
сделано:
channel serial source timestamp value channelid
0 4 1618741320 1 2019-01-01 08:35:00 12 239
1 3 1350397285 2 2019-01-01 09:20:00 37 73
2 1 814387724 2 2019-01-01 12:30:00 581 259
3 0 545914014 3 2019-01-01 01:45:00 0 28
4 5 814387724 0 2019-01-01 14:20:00 699 245
Но это не тот путь, который, я думаю, должен быть решен, в основном потому, что он заставляет меня выполнять связывание с пандами вместо SQLAlchemy.
Iтакже пробовал это, но это совершенно неэффективно для набора данных из 90 тыс. записей:
from sqlalchemy.orm import sessionmaker
Session = sessionmaker(bind=Engine)
session = Session()
with session.begin_nested() as trans:
for rec in data.to_dict(orient='records'):
c = session.query(Channel).filter_by(**{k: rec.pop(k) for k in ['serial', 'source', 'channel']})[0]
r = Record(channelid=c.id, **rec)
session.add(r)
Это занимает почти в 100 раз больше, чем предыдущий метод с использованием DataFrame.
Вопрос
Я сосредоточил свою энергию на создании всеобъемлющего MCVE, потому что я лучше владею пандами, чем SQLAlchemy, и я не смог найти решение своей проблемы в документации по SQLAlchemy.
Мой вопрос: "Какмогу ли я разрешить channelid
, чтобы моя вставка была успешной, производительной и основанной на SQLAclhemy вместо панд? "
Не стесняйтесь комментировать, чтобы улучшить этот пост.То, что я ищу, является рациональным способом сделать это.Это может подразумевать обновление модели данных, у меня есть такая гибкость.