Блоки данных -Test SQL созданная таблица существует с правильными столбцами - PullRequest
0 голосов
/ 04 февраля 2020

Я пытаюсь создать тест в Databricks, который проверяет, правильно ли создан набор таблиц с правильными столбцами. Такое ощущение, что это должно быть просто, но я не могу найти решение asp, все перенесено с Oracle и мой фон Oracle и SQL, а не python.

Например, представьте следующую таблицу примеров, которая будет заполнена данными панели мониторинга. Если он уже существует с другой структурой, сценарии создания отчетов завершатся неудачно.

%sql

CREATE TABLE IF NOT EXISTS $report_schema.p23a_com 
(AGE_GROUP                                STRING,
 UniqServReqID                            STRING,
 Rec_Code                                 STRING,
 Elapsed_days                             INT)
USING delta 
PARTITIONED BY (AGE_GROUP)

Часть теста следующая, но очевидно, что утверждение не выполнено из-за информации столбца раздела. Кажется, я не могу сделать DESCRIBE менее многословным, я мог бы удалить # из списка ввода, но это выглядит грязно и усложняет, когда я расширяю тест, чтобы подобрать тип данных. Есть ли лучший способ захватить схему?

def get_table_schema(dbase,table_name):

    desc_query="DESCRIBE "+dbase+"."+table_name
    df_tab_cols = sqlContext.sql(desc_query)
    return df_tab_cols

def test_table_schema(tab_cols,list_tab_cols):

    input_col_list = df_tab_cols.select("col_name").rdd.map(lambda row : row[0]).collect()
    assert set(input_col_list) == set(list_tab_cols)

db = report_schema
table = "p23a_com"
cols = ["AGE_GROUP","UniqServReqID","Rec_Code","Elapsed_days"]

df_tab_cols = get_table_schema(db,table)

test_table_schema(df_tab_cols,cols)


Ответы [ 2 ]

1 голос
/ 05 февраля 2020

Ответ - я немного толстый и слишком SQL сфокусированный. Вместо использования SQL Опишите все, что мне нужно было сделать, это прочитать таблицу непосредственно в фрейм данных с помощью spark, а затем использовать столбцы.

ie

def get_table_schema(dbase,table_name):

    desc_query = dbase+"."+table_name
    df_tab_cols = spark.table(desc_query)
    return df_tab_cols

def test_table_schema(tab_cols,list_tab_cols):

    input_col_list = list(df_tab_cols.columns) 
    assert set(input_col_list) == set(list_tab_cols)

    print(input_col_list)
0 голосов
/ 11 февраля 2020

Моя проблема в том, что мы перенесли систему отчетов на блоки данных, и некоторые сборки терпят неудачу из-за существующих таблиц, старых версий таблиц, создает сбой, когда удаление не завершено; у нас также нет полной интеграции git, и некоторые среды более дикие, чем мне бы хотелось.

В результате необходима проверка схемы.

Поскольку я не смог получить ответ, мой окончательный код на случай, если кому-то еще понадобится нечто подобное. Поскольку я хотел проверить столбцы и типы столбцов, которые я переключил на словари, оказалось, что сравнивать словари просто.

Я не думаю, что это очень pythoni c и неэффективно, когда есть много таблиц. Я думаю, что для создания словаря из фрейма данных может потребоваться rdd в среде без базы данных.

Не стесняйтесь критиковать, поскольку я все еще учусь.

# table_list is a python dictionary in the form 
# {tablename:{column1:column1_type, column2:column2_type, etc:etc}} 
# When a table is added or changed in the build it should be added as a dictionary item.

table_list = {
 'p23a_com': {'AGE_GROUP': 'string',
    'UniqServReqID': 'string',
    'Rec_Code': 'string',
    'Elapsed_days': 'int'},
 'p23b_com': {'AGE_GROUP': 'string',
    'UniqServReqID': 'string',
    'Org_code': 'string',
    'Elapsed_days': 'int'}}


# Function to get the schema for a table
# Function takes the database name and the table name and returns a dictionary of the columns and data types.

def get_table_schema(dbase,table_name):

    desc_query = dbase+"."+table_name   
    df_tab_cols = spark.createDataFrame(spark.table(desc_query).dtypes)
    tab_cols_dict = dict(map(lambda row: (row[0],row[1]), df_tab_cols.collect())) 
    return tab_cols_dict

# This is the test, it cycles through the table_list dictionary returning the columns and types and then doing a dictionary compare.
# The query will fail on any missing table or any table with incorrect columns.

for tab in table_list:

    tab_cols_d = get_table_schema(db,tab)
    assert tab_cols_d == table_list[tab] 
...