У меня есть требование для извлечения количества строк каждой таблицы в базе данных кустов (которая имеет несколько схем). Я написал задание pyspark, которое извлекает счетчики для каждой таблицы, оно прекрасно работает, когда я пытаюсь использовать некоторые схемы, но не удается из-за ошибки заголовка GV, когда я пытаюсь использовать все схемы. я попытался создать объединение всех для всех запросов таблиц в базе данных, а также попытался объединить все для всех таблиц в схеме. оба произошли с ошибкой G C.
Можете ли вы посоветовать избежать этой ошибки. ниже мой сценарий:
# For loop for Schema starts here
for schema in schemas_list:
# Dataframe with all table names available in given Schema for level1 and level2
tables_1_df=tables_df(schema,1)
tables_1_list=formatted_list(tables_1_df,1)
tables_2_df=tables_df(schema,2)
tables_2_list=formatted_list(tables_2_df,2)
tables_list=list(set(tables_1_list) & set(tables_2_list)) #Intersection of level1 and level2 tables per Schema Name
# For loop for Tables starts her
for table in tables_list:
# Creating Dataframe with Row Count of given table for level 1 and level2
level_1_query=prep_query(schema, table, 1)
level_2_query=prep_query(schema, table, 2)
level_1_count_df=level_1_count_df.union(table_count(level_1_query))
level_1_count_df.persist()
level_2_count_df=level_2_count_df.union(table_count(level_2_query))
level_2_count_df.persist()
# Validate if level1 and level2 are re-conciled, if not write the row into data frame which will intern write into file in S3 Location
level_1_2_join_df = level_1_count_df.alias("one").join(level_2_count_df.alias("two"),(level_1_count_df.schema_name==level_2_count_df.schema_name) & (level_1_count_df.table_name==level_2_count_df.table_name),'inner').select(col("one.schema_name"),col("two.table_name"),col("level_1_count"),col("level_2_count"))
main_df=header_df.union(level_1_2_join_df)
if extracttype=='DELTA':
main_df=main_df.filter(main_df.level_1_count!=main_df.level_2_count)
main_df=main_df.select(concat(col("schema_name"),lit(","),col("table_name"),lit(","),col("level_1_count"),lit(","),col("level_2_count")))
# creates file in temp location
file_output(main_df, tempfolder) # writes to txt file in hadoop