Я создаю быстрый и грязный ETL с использованием Python с одного сервера базы данных (в настоящее время DB2) на другой (MSSQL).Я просто высаживаю данные, поэтому никаких преобразований не происходит.Код, который я написал, работает, но он должен сначала извлечь весь набор данных, а затем вставить весь набор данных в место назначения.
Я хотел бы создать решение, которое позволило бы мне указать число строк «x» для извлечения из источника и пакетировать их до места назначения.
Я уверен, что естьэлегантное решение, но я недостаточно знаком с Python. Я просто ищу рекомендации по реализации, методам или методам.
Я использую SQLAlchemy и pandas для выполнения этой задачи.Мои исходные и целевые таблицы идентичны (насколько это возможно, поскольку типы данных различаются в разных реализациях SQL).Я заполняю фрейм данных, затем массово вставляю данные, используя MetaData и automap_base.
Функция массовой вставки
def bulkInsert(engine, df, tableName, schemaName = 'dbo'):
metadata = MetaData()
metadata.reflect(engine, only = [tableName], schema = schemaName)
Base = automap_base(metadata = metadata)
Base.prepare()
tableToInsert = Base.classes[tableName]
conn = engine.connect()
Session = sessionmaker(bind = conn)
session = Session()
session.bulk_insert_mappings(tableToInsert, df.to_dict(orient="records"), render_nulls = True)
session.commit()
session.close()
conn.close()
Получение исходных данных
db2 = db2Connect(db2Server)
df = pd.read_sql(query, db2, coerce_float=False)
db2.close()
Настройкаdestination
engine = mssqlSAEngine(server, database)
Запустить массовую вставку, заменить NaN на NULL
bulkInsert(engine, df.where(pd.notnull(df), None), tableName)
У меня не было проблем с успешной вставкой данных.Однако, когда я подхожу к отметке в миллион строк, моей системе не хватает памяти, и данные начинают выполнять подкачку страниц.Естественно, производительность значительно снижается.
У нас есть другие инструменты (например, SSIS), но я ищу динамический метод.В SSIS я могу либо написать задачу сценария C #, чтобы в основном выполнить то, что я делаю здесь, в Python, либо создать собственные DFT для каждой таблицы.С помощью этого метода мне просто нужно передать источник и пункт назначения.