Моя проблема в том, что мы перенесли систему отчетов на блоки данных, и некоторые сборки терпят неудачу из-за существующих таблиц, старых версий таблиц, создает сбой, когда удаление не завершено; у нас также нет полной интеграции git, и некоторые среды более дикие, чем мне бы хотелось.
В результате необходима проверка схемы.
Поскольку я не смог получить ответ, мой окончательный код на случай, если кому-то еще понадобится нечто подобное. Поскольку я хотел проверить столбцы и типы столбцов, которые я переключил на словари, оказалось, что сравнивать словари просто.
Я не думаю, что это очень pythoni c и неэффективно, когда есть много таблиц. Я думаю, что для создания словаря из фрейма данных может потребоваться rdd в среде без базы данных.
Не стесняйтесь критиковать, поскольку я все еще учусь.
# table_list is a python dictionary in the form
# {tablename:{column1:column1_type, column2:column2_type, etc:etc}}
# When a table is added or changed in the build it should be added as a dictionary item.
table_list = {
'p23a_com': {'AGE_GROUP': 'string',
'UniqServReqID': 'string',
'Rec_Code': 'string',
'Elapsed_days': 'int'},
'p23b_com': {'AGE_GROUP': 'string',
'UniqServReqID': 'string',
'Org_code': 'string',
'Elapsed_days': 'int'}}
# Function to get the schema for a table
# Function takes the database name and the table name and returns a dictionary of the columns and data types.
def get_table_schema(dbase,table_name):
desc_query = dbase+"."+table_name
df_tab_cols = spark.createDataFrame(spark.table(desc_query).dtypes)
tab_cols_dict = dict(map(lambda row: (row[0],row[1]), df_tab_cols.collect()))
return tab_cols_dict
# This is the test, it cycles through the table_list dictionary returning the columns and types and then doing a dictionary compare.
# The query will fail on any missing table or any table with incorrect columns.
for tab in table_list:
tab_cols_d = get_table_schema(db,tab)
assert tab_cols_d == table_list[tab]