Это совсем не элегантно, но в итоге я использовал этот метод:
Считайте type()
первого элемента в каждом сигнале и преобразуйте его в str
, который позже передается функции, чтобы в основном соответствовать соответствующему типу данных PostgreSQL. Обратите внимание, что охватываются только numpy
типы данных, относящиеся к моему проекту.
def db_data_type(data_type):
# Input is string of data type
conversion_table = {
"<class 'numpy.uint8'>": 'int2', # 0 to 255 # -32768 to 32767
"<class 'numpy.uint16'>": 'int4', # 0 to 65535 # -2147483648 to 2147483647
"<class 'numpy.uint32'>": 'int8', # 0 to 4924967295 # -9223372036854775808 to 9223372036854775807
"<class 'numpy.int8'>": 'int2', # -128 to 127
"<class 'numpy.int16'>": 'int2', # -32768 to 32767
"<class 'numpy.int32'>": 'int4', # -2147483648 to 2147483647
"<class 'numpy.float64'>": 'float8', # double
"<class 'numpy.bytes_'>": 'text',
}
return conversion_table[data_type]
В основном, когда я перебираю группу сигналов (столбцы в таблице базы данных), я получаю список имен сигналов (имена столбцов в БД) и список типов данных (типы данных столбцов в БД). А вот код для psycopg2
для генерации SQL-запроса:
def db_create_columns(conn, table, columns, types):
cur = conn.cursor()
for i in range(len(columns)):
sql_add_column = psycopg2.sql.SQL("""ALTER TABLE {} ADD COLUMN {} {} ;""") \
.format(psycopg2.sql.Identifier(table),
psycopg2.sql.Identifier(columns[i]),
psycopg2.sql.Identifier(types[i]))
try:
cur.execute(sql_add_column)
except Exception as e:
print('Error occurred when adding data column [%s].\n%s' % (columns[i], e))
cur.close()
conn.commit()
return
Здесь аргументы функции: подключение к БД, имя таблицы, список имен столбцов, список типов данных.
Если есть лучший способ сделать это, пожалуйста, не стесняйтесь указывать. Я был бы очень признателен.