У меня есть требование создать промежуточную таблицу в нашем хранилище данных, работающем на PostgreSQL, и импортировать данные с нашего веб-сайта Magento, использующего базу данных MySQL, я пытаюсь использовать Python.
Я создал приведенный ниже запрос для цели импорта:
1. Не могли бы вы проверить и подтвердить, нормально ли это?Или есть какие-то альтернативные методы для этого?
2. Также я хочу знать, как мы можем управлять проблемами несоответствия DATATYPE при портировании?
3. Есть ли что-нибудь, что мы можем сделать с посторонним рэппером данных (FDW), например multicorn? Как мы можем это сделать? Мне нужно всего несколько столбцов из источника (в источнике более 50 столбцов и мне нужно только 15 столбцов), чтобыпереведен в пункт назначения, поэтому FDW будет работать?
Если кто-то может опубликовать образец или отредактировать приведенный ниже код, это будет очень полезно.
import psycopg2
import os
import time
import MySQLdb
import sys
import mysql.connector
from pprint import pprint
from datetime import datetime
from psycopg2 import sql
#from utils.utils import get_global_config
def psql_command(msql, psql, msql_command, psql_command):
msql.execute(msql_command)
for row in cur_msql:
try:
psql.execute(command, row)
except psycopg2.Error as e:
print "Cannot execute the query!!", e.pgerror
sys.exit("Some problem occured with the query!!!")
def dB_Fetch():
try:
cnx_msql = mysql.connector.connect( host=host_mysql,
user=user_mysql,passwd=pswd_mysql, db=dbna_mysql )
except mysql.connector.Error as e:
print "MYSQL: Unable to connect!", e.msg
sys.exit(1)
# Postgresql connection
try:
cnx_psql = psycopg2.connect(conn_string_psql)
except psycopg2.Error as e:
print('PSQL: Unable to connect!\n{0}').format(e)
sys.exit(1)
# Cursors initializations
cur_msql = cnx_msql.cursor(dictionary=True)
cur_psql = cnx_psql.cursor()
try:
SQL_create_Staging_schema="""CREATE SCHEMA IF NOT EXISTS staging
AUTHORIZATION postgres;"""
SQL_create_sales_flat_quote="""CREATE TABLE IF NOT EXISTS
staging.sales_flat_quote
(
entity_id BIGINT
, store_id BIGINT
, customer_email TEXT
, customer_firstname TEXT
, customer_middlename TEXT
, customer_lastname TEXT
, customer_is_guest BIGINT
, customer_group_id BIGINT
, created_at TIMESTAMP WITHOUT TIME ZONE
, updated_at TIMESTAMP WITHOUT TIME ZONE
, is_active BIGINT
, items_count BIGINT
, items_qty BIGINT
, base_currency_code TEXT
, grand_total NUMERIC(12,4)
, base_to_global_rate NUMERIC(12,4)
, base_subtotal NUMERIC(12,4)
, base_subtotal_with_discount NUMERIC(12,4)
);"""
SQL_create_sales_flat_quote_item="""CREATE TABLE IF NOT EXISTS
staging.sales_flat_quote_item
( store_id INTEGER
, row_total NUMERIC
, updated_at TIMESTAMP WITHOUT TIME ZONE
, qty NUMERIC
, sku CHARACTER VARYING
, free_shipping INTEGER
, quote_id INTEGER
, price NUMERIC
, no_discount INTEGER
, item_id INTEGER
, product_type CHARACTER VARYING
, base_tax_amount NUMERIC
, product_id INTEGER
, name CHARACTER VARYING
, created_at TIMESTAMP WITHOUT TIME ZONE
);"""
print("Creating Schema")
cur_psql.execute(SQL_create_Staging_schema)
print("schema succesfully created")
print("Creating staging.sales_flat_quote table")
cur_psql.execute(SQL_create_sales_flat_quote)
print("staging.sales_flat_quote table succesfully created")
print("Creating staging.sales_flat_quote_item table")
cur_psql.execute(SQL_create_sales_flat_quote_item)
print("staging.sales_flat_quote_item table succesfully created")
cur_psql.commit();
print("Fetching data from source server")
commands = [
(
"SELECT customer_id,entity_id,store_id,created_at,updated_at
,items_count,base_row_total,row_total,base_discount_amount
,base_subtotal_with_discount,base_to_global_rate
,is_active from sales_flat_quote
where is_active=1;",
"INSERT INTO staging.sales_flat_quote
(customer_id,entity_id,store_id,created_at,updated_at
,items_count,base_row_total,row_total,base_discount_amount
,base_subtotal_with_discount,base_to_global_rate,is_active) \
VALUES (%(customer_id)s, %(entity_id)s
, %(store_id)s, %(created_at)s, %(updated_at)s
, %(items_count)s, %(base_row_total)s
, %(row_total)s, %(base_discount_amount)s
, %(base_subtotal_with_discount)s
, %(base_to_global_rate)s, %(is_active)s)"
),
(
"SELECT store_id,row_total,updated_at,qty,sku
,free_shipping,quote_id,price,no_discount,item_id,product_type
,base_tax_amount,product_id,name,created_at from
sales_flat_quote_item",
"INSERT INTO staging.sales_flat_quote_item
(store_id,row_total,updated_at,qty,sku,free_shipping
,quote_id,price,no_discount,item_id,product_type
,base_tax_amount,product_id,name,created_at)
VALUES (%(store_id)s, %(row_total)s, %(updated_at)s
, %(qty)s, %(sku)s, %(free_shipping)s, %(quote_id)s
, %(price)s, %(no_discount)s, %(item_id)s
, %(product_type)s, %(base_tax_amount)s, %(product_id)s
, %(name)s, % . (created_at)s)"
)
]
for msql_command, psql_command in commands:
psql_command(cur_msql, cur_psql, msql_command, psql_command)
except (Exception, psycopg2.Error) as error:
print ("Error while fetching data from PostgreSQL", error)
finally:
## Closing cursors
cur_msql.close()
cur_psql.close()
## Committing
cnx_psql.commit()
## Closing database connections
cnx_msql.close()
cnx_psql.close()
if __name__ == '__main__':
dB_Fetch()