Ошибка выполнения запроса через PySpark - G C Ошибка - PullRequest
0 голосов
/ 01 марта 2020

У меня есть требование для извлечения количества строк каждой таблицы в базе данных кустов (которая имеет несколько схем). Я написал задание pyspark, которое извлекает счетчики для каждой таблицы, оно прекрасно работает, когда я пытаюсь использовать некоторые схемы, но не удается из-за ошибки заголовка GV, когда я пытаюсь использовать все схемы. я попытался создать объединение всех для всех запросов таблиц в базе данных, а также попытался объединить все для всех таблиц в схеме. оба произошли с ошибкой G C.

Можете ли вы посоветовать избежать этой ошибки. ниже мой сценарий:

    # For loop for Schema starts here
for schema in schemas_list:

# Dataframe with all table names available in given Schema for level1 and level2
    tables_1_df=tables_df(schema,1)
    tables_1_list=formatted_list(tables_1_df,1)
    tables_2_df=tables_df(schema,2)
    tables_2_list=formatted_list(tables_2_df,2)
    tables_list=list(set(tables_1_list) & set(tables_2_list)) #Intersection of level1 and level2 tables per Schema Name

# For loop for Tables starts her
    for table in tables_list:

    # Creating Dataframe with Row Count of given table for level 1 and level2
        level_1_query=prep_query(schema, table, 1)
        level_2_query=prep_query(schema, table, 2)
        level_1_count_df=level_1_count_df.union(table_count(level_1_query))
        level_1_count_df.persist()
        level_2_count_df=level_2_count_df.union(table_count(level_2_query))
        level_2_count_df.persist()

# Validate if level1 and level2 are re-conciled, if not write the row into data frame which will intern write into file in S3 Location
level_1_2_join_df = level_1_count_df.alias("one").join(level_2_count_df.alias("two"),(level_1_count_df.schema_name==level_2_count_df.schema_name) & (level_1_count_df.table_name==level_2_count_df.table_name),'inner').select(col("one.schema_name"),col("two.table_name"),col("level_1_count"),col("level_2_count"))
main_df=header_df.union(level_1_2_join_df)
if extracttype=='DELTA':
    main_df=main_df.filter(main_df.level_1_count!=main_df.level_2_count)
main_df=main_df.select(concat(col("schema_name"),lit(","),col("table_name"),lit(","),col("level_1_count"),lit(","),col("level_2_count")))

    # creates file in temp location
file_output(main_df, tempfolder) # writes to txt file in hadoop
...