Сценарий ниже отражает мои обновленные, отредактированные попытки (следуя советам ниже) заполнить таблицы измерений строками из таблиц в оперативной базе данных, при условии, что первичные ключи из PANDAS DataFrame созданы из объединения столбцов идентификаторов из соответствующих таблиц в OPDB, отсутствуют в таблицах измерений.
import mysql.connector
import pandas as pd
...
op_cursor = op_connector.cursor
dwh_cursor = dwh_connector.cursor
...
class dimension_table:
def __init__(self, dwh_cols, op_cols, dim_id, dwh_table_name, op_table_name,op_args=None, dwh_args=None):
self.dwh_cols = ('')
self.op_cols = ('')
self.dim_id = dim_id
self.dwh_table_name = dwh_table_name
self.op_table_name = '`*opdb.*`.' + op_table_name
self.op_args = ",".join(op_cols)
self.dwh_args = ",".join(dwh_cols)
...
billing_address_data = dimension_table(("id","address", "alias", "postal_code", "type", "city", "country",
"geolocation"),
("id","address", "alias", "postal_code", "type", "city", "country",
"geolocation"),
billing_address_dim_id,'billing_address_dim', 'billing_address')
...
def load_dim(instance):
sql = """INSERT INTO {dwh} ({dwh_cols})
SELECT {op_cols}
FROM {op}
WHERE {pk} NOT IN
(SELECT {pk} FROM {dwh} WHERE id = %s)
LIMIT 1
"""
for key in instance.dim_id:
try:
# ID APPEND
dwh_cursor.execute(sql.format(dwh = instance.dwh_table_name,
dwh_cols = instance.dwh_args,
op_cols = instance.op_args,
op = instance.op_table_name,
pk = 'id'),
str(key))
dwh_connector.commit()
except mysql.connector.ProgrammingError as err:
# ORDER_ID APPEND
dwh_cursor.execute(sql.format(dwh = instance.dwh_table_name,
dwh_cols = instance.dwh_args,
op_cols = instance.op_args,
op = instance.op_table_name,
pk = 'order_id'),
str(key))
dwh_connector.commit()
billing_profile_op_id = dwh_cursor.lastrowid
...
load_dim(order_items_data)
Моя последняя проблема - ошибка, связанная с выполнением последней строки кода в сценарии, load_dim(order_items_data)
. это таблица order_items с PK order_id.
ProgrammingError: 1054 (42S22): неизвестный столбец 'id' в 'выражении where'